import type * as Arr from "effect/Array" import { NodeFileSystem, NodeSink, NodeStream } from "@effect/platform-node" import * as NodePath from "@effect/platform-node/NodePath" import * as Deferred from "effect/Deferred" import * as Effect from "effect/Effect" import * as Exit from "effect/Exit" import * as FileSystem from "effect/FileSystem" import * as Layer from "effect/Layer" import * as Path from "effect/Path" import * as PlatformError from "effect/PlatformError" import * as Predicate from "effect/Predicate" import type * as Scope from "effect/Scope" import * as Sink from "effect/Sink" import * as Stream from "effect/Stream" import * as ChildProcess from "effect/unstable/process/ChildProcess" import type { ChildProcessHandle } from "effect/unstable/process/ChildProcessSpawner" import { ChildProcessSpawner, ExitCode, make as makeSpawner, makeHandle, ProcessId, } from "effect/unstable/process/ChildProcessSpawner" import * as NodeChildProcess from "node:child_process" import { PassThrough } from "node:stream" import launch from "cross-spawn" const toError = (err: unknown): Error => (err instanceof globalThis.Error ? err : new globalThis.Error(String(err))) const toTag = (err: NodeJS.ErrnoException): PlatformError.SystemErrorTag => { switch (err.code) { case "ENOENT": return "NotFound" case "EACCES": return "PermissionDenied" case "EEXIST": return "AlreadyExists" case "EISDIR": return "BadResource" case "ENOTDIR": return "BadResource" case "EBUSY": return "Busy" case "ELOOP": return "BadResource" default: return "Unknown" } } const flatten = (command: ChildProcess.Command) => { const commands: Array = [] const opts: Array = [] const walk = (cmd: ChildProcess.Command): void => { switch (cmd._tag) { case "StandardCommand": commands.push(cmd) return case "PipedCommand": walk(cmd.left) opts.push(cmd.options) walk(cmd.right) return } } walk(command) if (commands.length === 0) throw new Error("flatten produced empty commands array") const [head, ...tail] = commands return { commands: [head, ...tail] as Arr.NonEmptyReadonlyArray, opts, } } const toPlatformError = ( method: string, err: NodeJS.ErrnoException, command: ChildProcess.Command, ): PlatformError.PlatformError => { const cmd = flatten(command) .commands.map((x) => `${x.command} ${x.args.join(" ")}`) .join(" | ") return PlatformError.systemError({ _tag: toTag(err), module: "ChildProcess", method, pathOrDescriptor: cmd, syscall: err.syscall, cause: err, }) } type ExitSignal = Deferred.Deferred export const make = Effect.gen(function* () { const fs = yield* FileSystem.FileSystem const path = yield* Path.Path const cwd = Effect.fnUntraced(function* (opts: ChildProcess.CommandOptions) { if (Predicate.isUndefined(opts.cwd)) return undefined yield* fs.access(opts.cwd) return path.resolve(opts.cwd) }) const env = (opts: ChildProcess.CommandOptions) => opts.extendEnv ? { ...globalThis.process.env, ...opts.env } : opts.env const input = (x: ChildProcess.CommandInput | undefined): NodeChildProcess.IOType | undefined => Stream.isStream(x) ? "pipe" : x const output = (x: ChildProcess.CommandOutput | undefined): NodeChildProcess.IOType | undefined => Sink.isSink(x) ? "pipe" : x const stdin = (opts: ChildProcess.CommandOptions): ChildProcess.StdinConfig => { const cfg: ChildProcess.StdinConfig = { stream: "pipe", encoding: "utf-8", endOnDone: true } if (Predicate.isUndefined(opts.stdin)) return cfg if (typeof opts.stdin === "string") return { ...cfg, stream: opts.stdin } if (Stream.isStream(opts.stdin)) return { ...cfg, stream: opts.stdin } return { stream: opts.stdin.stream, encoding: opts.stdin.encoding ?? cfg.encoding, endOnDone: opts.stdin.endOnDone ?? cfg.endOnDone, } } const stdio = (opts: ChildProcess.CommandOptions, key: "stdout" | "stderr"): ChildProcess.StdoutConfig => { const cfg = opts[key] if (Predicate.isUndefined(cfg)) return { stream: "pipe" } if (typeof cfg === "string") return { stream: cfg } if (Sink.isSink(cfg)) return { stream: cfg } return { stream: cfg.stream } } const fds = (opts: ChildProcess.CommandOptions) => { if (Predicate.isUndefined(opts.additionalFds)) return [] return Object.entries(opts.additionalFds) .flatMap(([name, config]) => { const fd = ChildProcess.parseFdName(name) return Predicate.isUndefined(fd) ? [] : [{ fd, config }] }) .toSorted((a, b) => a.fd - b.fd) } const stdios = ( sin: ChildProcess.StdinConfig, sout: ChildProcess.StdoutConfig, serr: ChildProcess.StderrConfig, extra: ReadonlyArray<{ fd: number; config: ChildProcess.AdditionalFdConfig }>, ): NodeChildProcess.StdioOptions => { const pipe = (x: NodeChildProcess.IOType | undefined) => process.platform === "win32" && x === "pipe" ? "overlapped" : x const arr: Array = [ pipe(input(sin.stream)), pipe(output(sout.stream)), pipe(output(serr.stream)), ] if (extra.length === 0) return arr as NodeChildProcess.StdioOptions const max = extra.reduce((acc, x) => Math.max(acc, x.fd), 2) for (let i = 3; i <= max; i++) arr[i] = "ignore" for (const x of extra) arr[x.fd] = pipe("pipe") return arr as NodeChildProcess.StdioOptions } const setupFds = Effect.fnUntraced(function* ( command: ChildProcess.StandardCommand, proc: NodeChildProcess.ChildProcess, extra: ReadonlyArray<{ fd: number; config: ChildProcess.AdditionalFdConfig }>, ) { if (extra.length === 0) { return { getInputFd: () => Sink.drain, getOutputFd: () => Stream.empty, } } const ins = new Map>() const outs = new Map>() for (const x of extra) { const node = proc.stdio[x.fd] switch (x.config.type) { case "input": { let sink: Sink.Sink = Sink.drain if (node && "write" in node) { sink = NodeSink.fromWritable({ evaluate: () => node, onError: (err) => toPlatformError(`fromWritable(fd${x.fd})`, toError(err), command), endOnDone: true, }) } if (x.config.stream) yield* Effect.forkScoped(Stream.run(x.config.stream, sink)) ins.set(x.fd, sink) break } case "output": { let stream: Stream.Stream = Stream.empty if (node && "read" in node) { const tap = new PassThrough() node.on("error", (err) => tap.destroy(toError(err))) node.pipe(tap) stream = NodeStream.fromReadable({ evaluate: () => tap, onError: (err) => toPlatformError(`fromReadable(fd${x.fd})`, toError(err), command), }) } if (x.config.sink) stream = Stream.transduce(stream, x.config.sink) outs.set(x.fd, stream) break } } } return { getInputFd: (fd: number) => ins.get(fd) ?? Sink.drain, getOutputFd: (fd: number) => outs.get(fd) ?? Stream.empty, } }) const setupStdin = ( command: ChildProcess.StandardCommand, proc: NodeChildProcess.ChildProcess, cfg: ChildProcess.StdinConfig, ) => Effect.suspend(() => { let sink: Sink.Sink = Sink.drain if (Predicate.isNotNull(proc.stdin)) { sink = NodeSink.fromWritable({ evaluate: () => proc.stdin!, onError: (err) => toPlatformError("fromWritable(stdin)", toError(err), command), endOnDone: cfg.endOnDone, encoding: cfg.encoding, }) } if (Stream.isStream(cfg.stream)) return Effect.as(Effect.forkScoped(Stream.run(cfg.stream, sink)), sink) return Effect.succeed(sink) }) const setupOutput = ( command: ChildProcess.StandardCommand, proc: NodeChildProcess.ChildProcess, out: ChildProcess.StdoutConfig, err: ChildProcess.StderrConfig, ) => { let stdout = proc.stdout ? NodeStream.fromReadable({ evaluate: () => proc.stdout!, onError: (cause) => toPlatformError("fromReadable(stdout)", toError(cause), command), }) : Stream.empty let stderr = proc.stderr ? NodeStream.fromReadable({ evaluate: () => proc.stderr!, onError: (cause) => toPlatformError("fromReadable(stderr)", toError(cause), command), }) : Stream.empty if (Sink.isSink(out.stream)) stdout = Stream.transduce(stdout, out.stream) if (Sink.isSink(err.stream)) stderr = Stream.transduce(stderr, err.stream) return { stdout, stderr, all: Stream.merge(stdout, stderr) } } const spawn = (command: ChildProcess.StandardCommand, opts: NodeChildProcess.SpawnOptions) => Effect.callback((resume) => { const signal = Deferred.makeUnsafe() const proc = launch(command.command, command.args, opts) let end = false let exit: readonly [code: number | null, signal: NodeJS.Signals | null] | undefined proc.on("error", (err) => { resume(Effect.fail(toPlatformError("spawn", err, command))) }) proc.on("exit", (...args) => { exit = args }) proc.on("close", (...args) => { if (end) return end = true Deferred.doneUnsafe(signal, Exit.succeed(exit ?? args)) }) proc.on("spawn", () => { resume(Effect.succeed([proc, signal])) }) return Effect.sync(() => { proc.kill("SIGTERM") }) }) const killGroup = ( command: ChildProcess.StandardCommand, proc: NodeChildProcess.ChildProcess, signal: NodeJS.Signals, ) => { if (globalThis.process.platform === "win32") { return Effect.callback((resume) => { NodeChildProcess.exec(`taskkill /pid ${proc.pid} /T /F`, { windowsHide: true }, (err) => { if (err) return resume(Effect.fail(toPlatformError("kill", toError(err), command))) resume(Effect.void) }) }) } return Effect.try({ try: () => { globalThis.process.kill(-proc.pid!, signal) }, catch: (err) => toPlatformError("kill", toError(err), command), }) } const killOne = ( command: ChildProcess.StandardCommand, proc: NodeChildProcess.ChildProcess, signal: NodeJS.Signals, ) => Effect.suspend(() => { if (proc.kill(signal)) return Effect.void return Effect.fail(toPlatformError("kill", new Error("Failed to kill child process"), command)) }) const timeout = ( proc: NodeChildProcess.ChildProcess, command: ChildProcess.StandardCommand, opts: ChildProcess.KillOptions | undefined, ) => ( f: ( command: ChildProcess.StandardCommand, proc: NodeChildProcess.ChildProcess, signal: NodeJS.Signals, ) => Effect.Effect, ) => { const signal = opts?.killSignal ?? "SIGTERM" if (Predicate.isUndefined(opts?.forceKillAfter)) return f(command, proc, signal) return Effect.timeoutOrElse(f(command, proc, signal), { duration: opts.forceKillAfter, orElse: () => f(command, proc, "SIGKILL"), }) } const source = (handle: ChildProcessHandle, from: ChildProcess.PipeFromOption | undefined) => { const opt = from ?? "stdout" switch (opt) { case "stdout": return handle.stdout case "stderr": return handle.stderr case "all": return handle.all default: { const fd = ChildProcess.parseFdName(opt) return Predicate.isNotUndefined(fd) ? handle.getOutputFd(fd) : handle.stdout } } } const spawnCommand: ( command: ChildProcess.Command, ) => Effect.Effect = Effect.fnUntraced( function* (command) { switch (command._tag) { case "StandardCommand": { const sin = stdin(command.options) const sout = stdio(command.options, "stdout") const serr = stdio(command.options, "stderr") const extra = fds(command.options) const dir = yield* cwd(command.options) const [proc, signal] = yield* Effect.acquireRelease( spawn(command, { cwd: dir, env: env(command.options), stdio: stdios(sin, sout, serr, extra), detached: command.options.detached ?? process.platform !== "win32", shell: command.options.shell, windowsHide: process.platform === "win32", }), Effect.fnUntraced(function* ([proc, signal]) { const done = yield* Deferred.isDone(signal) const kill = timeout(proc, command, command.options) if (done) { const [code] = yield* Deferred.await(signal) if (process.platform === "win32") return yield* Effect.void if (code !== 0 && Predicate.isNotNull(code)) return yield* Effect.ignore(kill(killGroup)) return yield* Effect.void } const send = (s: NodeJS.Signals) => Effect.catch(killGroup(command, proc, s), () => killOne(command, proc, s)) const sig = command.options.killSignal ?? "SIGTERM" const attempt = send(sig).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid) const escalated = command.options.forceKillAfter ? Effect.timeoutOrElse(attempt, { duration: command.options.forceKillAfter, orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid), }) : attempt return yield* Effect.ignore(escalated) }), ) const fd = yield* setupFds(command, proc, extra) const out = setupOutput(command, proc, sout, serr) let ref = true return makeHandle({ pid: ProcessId(proc.pid!), stdin: yield* setupStdin(command, proc, sin), stdout: out.stdout, stderr: out.stderr, all: out.all, getInputFd: fd.getInputFd, getOutputFd: fd.getOutputFd, isRunning: Effect.map(Deferred.isDone(signal), (done) => !done), exitCode: Effect.flatMap(Deferred.await(signal), ([code, signal]) => { if (Predicate.isNotNull(code)) return Effect.succeed(ExitCode(code)) return Effect.fail( toPlatformError( "exitCode", new Error(`Process interrupted due to receipt of signal: '${signal}'`), command, ), ) }), kill: (opts?: ChildProcess.KillOptions) => { const sig = opts?.killSignal ?? "SIGTERM" const send = (s: NodeJS.Signals) => Effect.catch(killGroup(command, proc, s), () => killOne(command, proc, s)) const attempt = send(sig).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid) if (!opts?.forceKillAfter) return attempt return Effect.timeoutOrElse(attempt, { duration: opts.forceKillAfter, orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid), }) }, unref: Effect.sync(() => { if (ref) { proc.unref() ref = false } return Effect.sync(() => { if (!ref) { proc.ref() ref = true } }) }), }) } case "PipedCommand": { const flat = flatten(command) const [head, ...tail] = flat.commands let handle = spawnCommand(head) for (let i = 0; i < tail.length; i++) { const next = tail[i] const opts = flat.opts[i] ?? {} const sin = stdin(next.options) const stream = Stream.unwrap(Effect.map(handle, (x) => source(x, opts.from))) const to = opts.to ?? "stdin" if (to === "stdin") { handle = spawnCommand( ChildProcess.make(next.command, next.args, { ...next.options, stdin: { ...sin, stream }, }), ) continue } const fd = ChildProcess.parseFdName(to) if (Predicate.isUndefined(fd)) { handle = spawnCommand( ChildProcess.make(next.command, next.args, { ...next.options, stdin: { ...sin, stream }, }), ) continue } handle = spawnCommand( ChildProcess.make(next.command, next.args, { ...next.options, additionalFds: { ...next.options.additionalFds, [ChildProcess.fdName(fd) as `fd${number}`]: { type: "input", stream }, }, }), ) } return yield* handle } } }, ) return makeSpawner(spawnCommand) }) export const layer: Layer.Layer = Layer.effect( ChildProcessSpawner, make, ) export const defaultLayer = layer.pipe(Layer.provide(NodeFileSystem.layer), Layer.provide(NodePath.layer)) export * as CrossSpawnSpawner from "./cross-spawn-spawner"