mirror of
https://github.com/anomalyco/opencode.git
synced 2026-03-23 23:24:56 +00:00
Compare commits
3 Commits
dev
...
kit/cross-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5da672be32 | ||
|
|
a9621936fa | ||
|
|
9925faa465 |
464
packages/opencode/src/effect/cross-spawn-spawner.ts
Normal file
464
packages/opencode/src/effect/cross-spawn-spawner.ts
Normal file
@@ -0,0 +1,464 @@
|
||||
import type * as Arr from "effect/Array"
|
||||
import { NodeSink, NodeStream } from "@effect/platform-node"
|
||||
import * as Deferred from "effect/Deferred"
|
||||
import * as Effect from "effect/Effect"
|
||||
import * as Exit from "effect/Exit"
|
||||
import * as FileSystem from "effect/FileSystem"
|
||||
import * as Layer from "effect/Layer"
|
||||
import * as Path from "effect/Path"
|
||||
import * as PlatformError from "effect/PlatformError"
|
||||
import * as Predicate from "effect/Predicate"
|
||||
import type * as Scope from "effect/Scope"
|
||||
import * as Sink from "effect/Sink"
|
||||
import * as Stream from "effect/Stream"
|
||||
import * as ChildProcess from "effect/unstable/process/ChildProcess"
|
||||
import type { ChildProcessHandle } from "effect/unstable/process/ChildProcessSpawner"
|
||||
import {
|
||||
ChildProcessSpawner,
|
||||
ExitCode,
|
||||
make as makeSpawner,
|
||||
makeHandle,
|
||||
ProcessId,
|
||||
} from "effect/unstable/process/ChildProcessSpawner"
|
||||
import * as NodeChildProcess from "node:child_process"
|
||||
import { PassThrough } from "node:stream"
|
||||
import launch from "cross-spawn"
|
||||
|
||||
const toError = (err: unknown): Error => (err instanceof globalThis.Error ? err : new globalThis.Error(String(err)))
|
||||
|
||||
const toTag = (err: NodeJS.ErrnoException): PlatformError.SystemErrorTag => {
|
||||
switch (err.code) {
|
||||
case "ENOENT":
|
||||
return "NotFound"
|
||||
case "EACCES":
|
||||
return "PermissionDenied"
|
||||
case "EEXIST":
|
||||
return "AlreadyExists"
|
||||
case "EISDIR":
|
||||
return "BadResource"
|
||||
case "ENOTDIR":
|
||||
return "BadResource"
|
||||
case "EBUSY":
|
||||
return "Busy"
|
||||
case "ELOOP":
|
||||
return "BadResource"
|
||||
default:
|
||||
return "Unknown"
|
||||
}
|
||||
}
|
||||
|
||||
const flatten = (command: ChildProcess.Command) => {
|
||||
const commands: Array<ChildProcess.StandardCommand> = []
|
||||
const opts: Array<ChildProcess.PipeOptions> = []
|
||||
|
||||
const walk = (cmd: ChildProcess.Command): void => {
|
||||
switch (cmd._tag) {
|
||||
case "StandardCommand":
|
||||
commands.push(cmd)
|
||||
return
|
||||
case "PipedCommand":
|
||||
walk(cmd.left)
|
||||
opts.push(cmd.options)
|
||||
walk(cmd.right)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
walk(command)
|
||||
if (commands.length === 0) throw new Error("flatten produced empty commands array")
|
||||
const [head, ...tail] = commands
|
||||
return {
|
||||
commands: [head, ...tail] as Arr.NonEmptyReadonlyArray<ChildProcess.StandardCommand>,
|
||||
opts,
|
||||
}
|
||||
}
|
||||
|
||||
const toPlatformError = (
|
||||
method: string,
|
||||
err: NodeJS.ErrnoException,
|
||||
command: ChildProcess.Command,
|
||||
): PlatformError.PlatformError => {
|
||||
const cmd = flatten(command)
|
||||
.commands.map((x) => `${x.command} ${x.args.join(" ")}`)
|
||||
.join(" | ")
|
||||
return PlatformError.systemError({
|
||||
_tag: toTag(err),
|
||||
module: "ChildProcess",
|
||||
method,
|
||||
pathOrDescriptor: cmd,
|
||||
syscall: err.syscall,
|
||||
cause: err,
|
||||
})
|
||||
}
|
||||
|
||||
type ExitSignal = Deferred.Deferred<readonly [code: number | null, signal: NodeJS.Signals | null]>
|
||||
|
||||
export const make = Effect.gen(function* () {
|
||||
const fs = yield* FileSystem.FileSystem
|
||||
const path = yield* Path.Path
|
||||
|
||||
const cwd = Effect.fnUntraced(function* (opts: ChildProcess.CommandOptions) {
|
||||
if (Predicate.isUndefined(opts.cwd)) return undefined
|
||||
yield* fs.access(opts.cwd)
|
||||
return path.resolve(opts.cwd)
|
||||
})
|
||||
|
||||
const env = (opts: ChildProcess.CommandOptions) =>
|
||||
opts.extendEnv ? { ...globalThis.process.env, ...opts.env } : opts.env
|
||||
|
||||
const input = (x: ChildProcess.CommandInput | undefined): NodeChildProcess.IOType | undefined =>
|
||||
Stream.isStream(x) ? "pipe" : x
|
||||
|
||||
const output = (x: ChildProcess.CommandOutput | undefined): NodeChildProcess.IOType | undefined =>
|
||||
Sink.isSink(x) ? "pipe" : x
|
||||
|
||||
const stdin = (opts: ChildProcess.CommandOptions): ChildProcess.StdinConfig => {
|
||||
const cfg: ChildProcess.StdinConfig = { stream: "pipe", encoding: "utf-8", endOnDone: true }
|
||||
if (Predicate.isUndefined(opts.stdin)) return cfg
|
||||
if (typeof opts.stdin === "string") return { ...cfg, stream: opts.stdin }
|
||||
if (Stream.isStream(opts.stdin)) return { ...cfg, stream: opts.stdin }
|
||||
return {
|
||||
stream: opts.stdin.stream,
|
||||
encoding: opts.stdin.encoding ?? cfg.encoding,
|
||||
endOnDone: opts.stdin.endOnDone ?? cfg.endOnDone,
|
||||
}
|
||||
}
|
||||
|
||||
const stdio = (opts: ChildProcess.CommandOptions, key: "stdout" | "stderr"): ChildProcess.StdoutConfig => {
|
||||
const cfg = opts[key]
|
||||
if (Predicate.isUndefined(cfg)) return { stream: "pipe" }
|
||||
if (typeof cfg === "string") return { stream: cfg }
|
||||
if (Sink.isSink(cfg)) return { stream: cfg }
|
||||
return { stream: cfg.stream }
|
||||
}
|
||||
|
||||
const fds = (opts: ChildProcess.CommandOptions) => {
|
||||
if (Predicate.isUndefined(opts.additionalFds)) return []
|
||||
return Object.entries(opts.additionalFds)
|
||||
.flatMap(([name, config]) => {
|
||||
const fd = ChildProcess.parseFdName(name)
|
||||
return Predicate.isUndefined(fd) ? [] : [{ fd, config }]
|
||||
})
|
||||
.toSorted((a, b) => a.fd - b.fd)
|
||||
}
|
||||
|
||||
const stdios = (
|
||||
sin: ChildProcess.StdinConfig,
|
||||
sout: ChildProcess.StdoutConfig,
|
||||
serr: ChildProcess.StderrConfig,
|
||||
extra: ReadonlyArray<{ fd: number; config: ChildProcess.AdditionalFdConfig }>,
|
||||
): NodeChildProcess.StdioOptions => {
|
||||
const arr: Array<NodeChildProcess.IOType | undefined> = [
|
||||
input(sin.stream),
|
||||
output(sout.stream),
|
||||
output(serr.stream),
|
||||
]
|
||||
if (extra.length === 0) return arr as NodeChildProcess.StdioOptions
|
||||
const max = extra.reduce((acc, x) => Math.max(acc, x.fd), 2)
|
||||
for (let i = 3; i <= max; i++) arr[i] = "ignore"
|
||||
for (const x of extra) arr[x.fd] = "pipe"
|
||||
return arr as NodeChildProcess.StdioOptions
|
||||
}
|
||||
|
||||
const setupFds = Effect.fnUntraced(function* (
|
||||
command: ChildProcess.StandardCommand,
|
||||
proc: NodeChildProcess.ChildProcess,
|
||||
extra: ReadonlyArray<{ fd: number; config: ChildProcess.AdditionalFdConfig }>,
|
||||
) {
|
||||
if (extra.length === 0) {
|
||||
return {
|
||||
getInputFd: () => Sink.drain,
|
||||
getOutputFd: () => Stream.empty,
|
||||
}
|
||||
}
|
||||
|
||||
const ins = new Map<number, Sink.Sink<void, Uint8Array, never, PlatformError.PlatformError>>()
|
||||
const outs = new Map<number, Stream.Stream<Uint8Array, PlatformError.PlatformError>>()
|
||||
|
||||
for (const x of extra) {
|
||||
const node = proc.stdio[x.fd]
|
||||
switch (x.config.type) {
|
||||
case "input": {
|
||||
let sink: Sink.Sink<void, Uint8Array, never, PlatformError.PlatformError> = Sink.drain
|
||||
if (node && "write" in node) {
|
||||
sink = NodeSink.fromWritable({
|
||||
evaluate: () => node,
|
||||
onError: (err) => toPlatformError(`fromWritable(fd${x.fd})`, toError(err), command),
|
||||
})
|
||||
}
|
||||
if (x.config.stream) yield* Effect.forkScoped(Stream.run(x.config.stream, sink))
|
||||
ins.set(x.fd, sink)
|
||||
break
|
||||
}
|
||||
case "output": {
|
||||
let stream: Stream.Stream<Uint8Array, PlatformError.PlatformError> = Stream.empty
|
||||
if (node && "read" in node) {
|
||||
const tap = new PassThrough()
|
||||
node.on("error", (err) => tap.destroy(toError(err)))
|
||||
node.pipe(tap)
|
||||
stream = NodeStream.fromReadable({
|
||||
evaluate: () => tap,
|
||||
onError: (err) => toPlatformError(`fromReadable(fd${x.fd})`, toError(err), command),
|
||||
})
|
||||
}
|
||||
if (x.config.sink) stream = Stream.transduce(stream, x.config.sink)
|
||||
outs.set(x.fd, stream)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
getInputFd: (fd: number) => ins.get(fd) ?? Sink.drain,
|
||||
getOutputFd: (fd: number) => outs.get(fd) ?? Stream.empty,
|
||||
}
|
||||
})
|
||||
|
||||
const setupStdin = (
|
||||
command: ChildProcess.StandardCommand,
|
||||
proc: NodeChildProcess.ChildProcess,
|
||||
cfg: ChildProcess.StdinConfig,
|
||||
) =>
|
||||
Effect.suspend(() => {
|
||||
let sink: Sink.Sink<void, unknown, never, PlatformError.PlatformError> = Sink.drain
|
||||
if (Predicate.isNotNull(proc.stdin)) {
|
||||
sink = NodeSink.fromWritable({
|
||||
evaluate: () => proc.stdin!,
|
||||
onError: (err) => toPlatformError("fromWritable(stdin)", toError(err), command),
|
||||
endOnDone: cfg.endOnDone,
|
||||
encoding: cfg.encoding,
|
||||
})
|
||||
}
|
||||
if (Stream.isStream(cfg.stream)) return Effect.as(Effect.forkScoped(Stream.run(cfg.stream, sink)), sink)
|
||||
return Effect.succeed(sink)
|
||||
})
|
||||
|
||||
const setupOutput = (
|
||||
command: ChildProcess.StandardCommand,
|
||||
proc: NodeChildProcess.ChildProcess,
|
||||
out: ChildProcess.StdoutConfig,
|
||||
err: ChildProcess.StderrConfig,
|
||||
) => {
|
||||
let stdout = proc.stdout
|
||||
? NodeStream.fromReadable({
|
||||
evaluate: () => proc.stdout!,
|
||||
onError: (cause) => toPlatformError("fromReadable(stdout)", toError(cause), command),
|
||||
})
|
||||
: Stream.empty
|
||||
let stderr = proc.stderr
|
||||
? NodeStream.fromReadable({
|
||||
evaluate: () => proc.stderr!,
|
||||
onError: (cause) => toPlatformError("fromReadable(stderr)", toError(cause), command),
|
||||
})
|
||||
: Stream.empty
|
||||
|
||||
if (Sink.isSink(out.stream)) stdout = Stream.transduce(stdout, out.stream)
|
||||
if (Sink.isSink(err.stream)) stderr = Stream.transduce(stderr, err.stream)
|
||||
|
||||
return { stdout, stderr, all: Stream.merge(stdout, stderr) }
|
||||
}
|
||||
|
||||
const spawn = (command: ChildProcess.StandardCommand, opts: NodeChildProcess.SpawnOptions) =>
|
||||
Effect.callback<readonly [NodeChildProcess.ChildProcess, ExitSignal], PlatformError.PlatformError>((resume) => {
|
||||
const signal = Deferred.makeUnsafe<readonly [code: number | null, signal: NodeJS.Signals | null]>()
|
||||
const proc = launch(command.command, command.args, opts)
|
||||
proc.on("error", (err) => {
|
||||
resume(Effect.fail(toPlatformError("spawn", err, command)))
|
||||
})
|
||||
proc.on("exit", (...args) => {
|
||||
Deferred.doneUnsafe(signal, Exit.succeed(args))
|
||||
})
|
||||
proc.on("spawn", () => {
|
||||
resume(Effect.succeed([proc, signal]))
|
||||
})
|
||||
return Effect.sync(() => {
|
||||
proc.kill("SIGTERM")
|
||||
})
|
||||
})
|
||||
|
||||
const killGroup = (
|
||||
command: ChildProcess.StandardCommand,
|
||||
proc: NodeChildProcess.ChildProcess,
|
||||
signal: NodeJS.Signals,
|
||||
) => {
|
||||
if (globalThis.process.platform === "win32") {
|
||||
return Effect.callback<void, PlatformError.PlatformError>((resume) => {
|
||||
NodeChildProcess.exec(`taskkill /pid ${proc.pid} /T /F`, (err) => {
|
||||
if (err) return resume(Effect.fail(toPlatformError("kill", toError(err), command)))
|
||||
resume(Effect.void)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
return Effect.try({
|
||||
try: () => {
|
||||
globalThis.process.kill(-proc.pid!, signal)
|
||||
},
|
||||
catch: (err) => toPlatformError("kill", toError(err), command),
|
||||
})
|
||||
}
|
||||
|
||||
const killOne = (
|
||||
command: ChildProcess.StandardCommand,
|
||||
proc: NodeChildProcess.ChildProcess,
|
||||
signal: NodeJS.Signals,
|
||||
) =>
|
||||
Effect.suspend(() => {
|
||||
if (proc.kill(signal)) return Effect.void
|
||||
return Effect.fail(toPlatformError("kill", new Error("Failed to kill child process"), command))
|
||||
})
|
||||
|
||||
const timeout =
|
||||
(
|
||||
proc: NodeChildProcess.ChildProcess,
|
||||
command: ChildProcess.StandardCommand,
|
||||
opts: ChildProcess.KillOptions | undefined,
|
||||
) =>
|
||||
<A, E, R>(
|
||||
f: (
|
||||
command: ChildProcess.StandardCommand,
|
||||
proc: NodeChildProcess.ChildProcess,
|
||||
signal: NodeJS.Signals,
|
||||
) => Effect.Effect<A, E, R>,
|
||||
) => {
|
||||
const signal = opts?.killSignal ?? "SIGTERM"
|
||||
if (Predicate.isUndefined(opts?.forceKillAfter)) return f(command, proc, signal)
|
||||
return Effect.timeoutOrElse(f(command, proc, signal), {
|
||||
duration: opts.forceKillAfter,
|
||||
onTimeout: () => f(command, proc, "SIGKILL"),
|
||||
})
|
||||
}
|
||||
|
||||
const source = (handle: ChildProcessHandle, from: ChildProcess.PipeFromOption | undefined) => {
|
||||
const opt = from ?? "stdout"
|
||||
switch (opt) {
|
||||
case "stdout":
|
||||
return handle.stdout
|
||||
case "stderr":
|
||||
return handle.stderr
|
||||
case "all":
|
||||
return handle.all
|
||||
default: {
|
||||
const fd = ChildProcess.parseFdName(opt)
|
||||
return Predicate.isNotUndefined(fd) ? handle.getOutputFd(fd) : handle.stdout
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const spawnCommand: (
|
||||
command: ChildProcess.Command,
|
||||
) => Effect.Effect<ChildProcessHandle, PlatformError.PlatformError, Scope.Scope> = Effect.fnUntraced(
|
||||
function* (command) {
|
||||
switch (command._tag) {
|
||||
case "StandardCommand": {
|
||||
const sin = stdin(command.options)
|
||||
const sout = stdio(command.options, "stdout")
|
||||
const serr = stdio(command.options, "stderr")
|
||||
const extra = fds(command.options)
|
||||
const dir = yield* cwd(command.options)
|
||||
|
||||
const [proc, signal] = yield* Effect.acquireRelease(
|
||||
spawn(command, {
|
||||
cwd: dir,
|
||||
env: env(command.options),
|
||||
stdio: stdios(sin, sout, serr, extra),
|
||||
detached: command.options.detached ?? process.platform !== "win32",
|
||||
shell: command.options.shell,
|
||||
}),
|
||||
Effect.fnUntraced(function* ([proc, signal]) {
|
||||
const done = yield* Deferred.isDone(signal)
|
||||
const kill = timeout(proc, command, command.options)
|
||||
if (done) {
|
||||
const [code] = yield* Deferred.await(signal)
|
||||
if (code !== 0 && Predicate.isNotNull(code)) return yield* Effect.ignore(kill(killGroup))
|
||||
return yield* Effect.void
|
||||
}
|
||||
return yield* kill((command, proc, signal) =>
|
||||
Effect.catch(killGroup(command, proc, signal), () => killOne(command, proc, signal)),
|
||||
).pipe(Effect.andThen(Deferred.await(signal)), Effect.ignore)
|
||||
}),
|
||||
)
|
||||
|
||||
const fd = yield* setupFds(command, proc, extra)
|
||||
const out = setupOutput(command, proc, sout, serr)
|
||||
return makeHandle({
|
||||
pid: ProcessId(proc.pid!),
|
||||
stdin: yield* setupStdin(command, proc, sin),
|
||||
stdout: out.stdout,
|
||||
stderr: out.stderr,
|
||||
all: out.all,
|
||||
getInputFd: fd.getInputFd,
|
||||
getOutputFd: fd.getOutputFd,
|
||||
isRunning: Effect.map(Deferred.isDone(signal), (done) => !done),
|
||||
exitCode: Effect.flatMap(Deferred.await(signal), ([code, signal]) => {
|
||||
if (Predicate.isNotNull(code)) return Effect.succeed(ExitCode(code))
|
||||
return Effect.fail(
|
||||
toPlatformError(
|
||||
"exitCode",
|
||||
new Error(`Process interrupted due to receipt of signal: '${signal}'`),
|
||||
command,
|
||||
),
|
||||
)
|
||||
}),
|
||||
kill: (opts?: ChildProcess.KillOptions) =>
|
||||
timeout(
|
||||
proc,
|
||||
command,
|
||||
opts,
|
||||
)((command, proc, signal) =>
|
||||
Effect.catch(killGroup(command, proc, signal), () => killOne(command, proc, signal)),
|
||||
).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid),
|
||||
})
|
||||
}
|
||||
case "PipedCommand": {
|
||||
const flat = flatten(command)
|
||||
const [head, ...tail] = flat.commands
|
||||
let handle = spawnCommand(head)
|
||||
for (let i = 0; i < tail.length; i++) {
|
||||
const next = tail[i]
|
||||
const opts = flat.opts[i] ?? {}
|
||||
const sin = stdin(next.options)
|
||||
const stream = Stream.unwrap(Effect.map(handle, (x) => source(x, opts.from)))
|
||||
const to = opts.to ?? "stdin"
|
||||
if (to === "stdin") {
|
||||
handle = spawnCommand(
|
||||
ChildProcess.make(next.command, next.args, {
|
||||
...next.options,
|
||||
stdin: { ...sin, stream },
|
||||
}),
|
||||
)
|
||||
continue
|
||||
}
|
||||
const fd = ChildProcess.parseFdName(to)
|
||||
if (Predicate.isUndefined(fd)) {
|
||||
handle = spawnCommand(
|
||||
ChildProcess.make(next.command, next.args, {
|
||||
...next.options,
|
||||
stdin: { ...sin, stream },
|
||||
}),
|
||||
)
|
||||
continue
|
||||
}
|
||||
handle = spawnCommand(
|
||||
ChildProcess.make(next.command, next.args, {
|
||||
...next.options,
|
||||
additionalFds: {
|
||||
...next.options.additionalFds,
|
||||
[ChildProcess.fdName(fd) as `fd${number}`]: { type: "input", stream },
|
||||
},
|
||||
}),
|
||||
)
|
||||
}
|
||||
return yield* handle
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
return makeSpawner(spawnCommand)
|
||||
})
|
||||
|
||||
export const layer: Layer.Layer<ChildProcessSpawner, never, FileSystem.FileSystem | Path.Path> = Layer.effect(
|
||||
ChildProcessSpawner,
|
||||
make,
|
||||
)
|
||||
@@ -1,6 +1,7 @@
|
||||
import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
|
||||
import { NodeFileSystem, NodePath } from "@effect/platform-node"
|
||||
import { Effect, Layer, Schema, ServiceMap, Stream } from "effect"
|
||||
import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
|
||||
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
||||
import { makeRunPromise } from "@/effect/run-service"
|
||||
import { withTransientReadRetry } from "@/util/effect-http-client"
|
||||
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
||||
@@ -340,7 +341,7 @@ export namespace Installation {
|
||||
|
||||
export const defaultLayer = layer.pipe(
|
||||
Layer.provide(FetchHttpClient.layer),
|
||||
Layer.provide(NodeChildProcessSpawner.layer),
|
||||
Layer.provide(CrossSpawnSpawner.layer),
|
||||
Layer.provide(NodeFileSystem.layer),
|
||||
Layer.provide(NodePath.layer),
|
||||
)
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
|
||||
import { NodeFileSystem, NodePath } from "@effect/platform-node"
|
||||
import { Cause, Duration, Effect, Layer, Schedule, ServiceMap, Stream } from "effect"
|
||||
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
||||
import path from "path"
|
||||
import z from "zod"
|
||||
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { makeRunPromise } from "@/effect/run-service"
|
||||
import { AppFileSystem } from "@/filesystem"
|
||||
@@ -354,9 +355,9 @@ export namespace Snapshot {
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(
|
||||
Layer.provide(NodeChildProcessSpawner.layer),
|
||||
Layer.provide(CrossSpawnSpawner.layer),
|
||||
Layer.provide(AppFileSystem.defaultLayer),
|
||||
Layer.provide(NodeFileSystem.layer), // needed by NodeChildProcessSpawner
|
||||
Layer.provide(NodeFileSystem.layer), // needed by CrossSpawnSpawner
|
||||
Layer.provide(NodePath.layer),
|
||||
)
|
||||
|
||||
|
||||
402
packages/opencode/test/effect/cross-spawn-spawner.test.ts
Normal file
402
packages/opencode/test/effect/cross-spawn-spawner.test.ts
Normal file
@@ -0,0 +1,402 @@
|
||||
import { NodeFileSystem, NodePath } from "@effect/platform-node"
|
||||
import { describe, expect, test } from "bun:test"
|
||||
import fs from "node:fs/promises"
|
||||
import path from "node:path"
|
||||
import { Effect, Exit, Layer, Stream } from "effect"
|
||||
import type * as PlatformError from "effect/PlatformError"
|
||||
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
||||
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
|
||||
const live = CrossSpawnSpawner.layer.pipe(Layer.provide(NodeFileSystem.layer), Layer.provide(NodePath.layer))
|
||||
|
||||
function run<A, E>(effect: Effect.Effect<A, E, ChildProcessSpawner.ChildProcessSpawner>) {
|
||||
return Effect.runPromise(effect.pipe(Effect.provide(live)))
|
||||
}
|
||||
|
||||
function runScoped<A, E>(
|
||||
effect: Effect.Effect<A, E, ChildProcessSpawner.ChildProcessSpawner | import("effect/Scope").Scope>,
|
||||
) {
|
||||
return Effect.runPromise(Effect.scoped(effect).pipe(Effect.provide(live)))
|
||||
}
|
||||
|
||||
function decodeByteStream(stream: Stream.Stream<Uint8Array, PlatformError.PlatformError>) {
|
||||
return Stream.runCollect(stream).pipe(
|
||||
Effect.map((chunks) => {
|
||||
const totalLength = chunks.reduce((acc, c) => acc + c.length, 0)
|
||||
const result = new Uint8Array(totalLength)
|
||||
let offset = 0
|
||||
for (const chunk of chunks) {
|
||||
result.set(chunk, offset)
|
||||
offset += chunk.length
|
||||
}
|
||||
return new TextDecoder("utf-8").decode(result).trim()
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
describe("cross-spawn spawner", () => {
|
||||
describe("basic spawning", () => {
|
||||
test("captures stdout", async () => {
|
||||
const out = await run(
|
||||
ChildProcessSpawner.ChildProcessSpawner.use((svc) =>
|
||||
svc.string(ChildProcess.make(process.execPath, ["-e", 'process.stdout.write("ok")'])),
|
||||
),
|
||||
)
|
||||
expect(out).toBe("ok")
|
||||
})
|
||||
|
||||
test("captures multiple lines", async () => {
|
||||
const out = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "echo line1; echo line2; echo line3"])
|
||||
return yield* decodeByteStream(handle.stdout)
|
||||
}),
|
||||
)
|
||||
expect(out).toBe("line1\nline2\nline3")
|
||||
})
|
||||
|
||||
test("returns exit code", async () => {
|
||||
const code = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "exit 0"])
|
||||
return yield* handle.exitCode
|
||||
}),
|
||||
)
|
||||
expect(code).toBe(ChildProcessSpawner.ExitCode(0))
|
||||
})
|
||||
|
||||
test("returns non-zero exit code", async () => {
|
||||
const code = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "exit 42"])
|
||||
return yield* handle.exitCode
|
||||
}),
|
||||
)
|
||||
expect(code).toBe(ChildProcessSpawner.ExitCode(42))
|
||||
})
|
||||
})
|
||||
|
||||
describe("cwd option", () => {
|
||||
test("uses cwd when spawning commands", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
|
||||
const out = await run(
|
||||
ChildProcessSpawner.ChildProcessSpawner.use((svc) =>
|
||||
svc.string(
|
||||
ChildProcess.make(process.execPath, ["-e", "process.stdout.write(process.cwd())"], { cwd: tmp.path }),
|
||||
),
|
||||
),
|
||||
)
|
||||
expect(out).toBe(tmp.path)
|
||||
})
|
||||
|
||||
test("fails for invalid cwd", async () => {
|
||||
const exit = await Effect.runPromiseExit(
|
||||
Effect.scoped(ChildProcess.make("echo", ["test"], { cwd: "/nonexistent/directory/path" }).asEffect()).pipe(
|
||||
Effect.provide(live),
|
||||
),
|
||||
)
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe("env option", () => {
|
||||
test("passes environment variables with extendEnv", async () => {
|
||||
const out = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "echo $TEST_VAR"], {
|
||||
env: { TEST_VAR: "test_value" },
|
||||
extendEnv: true,
|
||||
})
|
||||
return yield* decodeByteStream(handle.stdout)
|
||||
}),
|
||||
)
|
||||
expect(out).toBe("test_value")
|
||||
})
|
||||
|
||||
test("passes multiple environment variables", async () => {
|
||||
const out = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "echo $VAR1-$VAR2-$VAR3"], {
|
||||
env: { VAR1: "one", VAR2: "two", VAR3: "three" },
|
||||
extendEnv: true,
|
||||
})
|
||||
return yield* decodeByteStream(handle.stdout)
|
||||
}),
|
||||
)
|
||||
expect(out).toBe("one-two-three")
|
||||
})
|
||||
})
|
||||
|
||||
describe("stderr", () => {
|
||||
test("captures stderr output", async () => {
|
||||
const err = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "echo error message >&2"])
|
||||
return yield* decodeByteStream(handle.stderr)
|
||||
}),
|
||||
)
|
||||
expect(err).toBe("error message")
|
||||
})
|
||||
|
||||
test("captures both stdout and stderr", async () => {
|
||||
const { stdout, stderr } = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "echo stdout; echo stderr >&2"])
|
||||
const [stdout, stderr] = yield* Effect.all([decodeByteStream(handle.stdout), decodeByteStream(handle.stderr)])
|
||||
return { stdout, stderr }
|
||||
}),
|
||||
)
|
||||
expect(stdout).toBe("stdout")
|
||||
expect(stderr).toBe("stderr")
|
||||
})
|
||||
})
|
||||
|
||||
describe("combined output (all)", () => {
|
||||
test("captures stdout via .all when no stderr", async () => {
|
||||
const all = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("echo", ["hello from stdout"])
|
||||
return yield* decodeByteStream(handle.all)
|
||||
}),
|
||||
)
|
||||
expect(all).toBe("hello from stdout")
|
||||
})
|
||||
|
||||
test("captures stderr via .all when no stdout", async () => {
|
||||
const all = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "echo hello from stderr >&2"])
|
||||
return yield* decodeByteStream(handle.all)
|
||||
}),
|
||||
)
|
||||
expect(all).toBe("hello from stderr")
|
||||
})
|
||||
})
|
||||
|
||||
describe("stdin", () => {
|
||||
test("allows providing standard input to a command", async () => {
|
||||
const out = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const input = "a b c"
|
||||
const stdin = Stream.make(Buffer.from(input, "utf-8"))
|
||||
const handle = yield* ChildProcess.make("cat", { stdin })
|
||||
const output = yield* decodeByteStream(handle.stdout)
|
||||
yield* handle.exitCode
|
||||
return output
|
||||
}),
|
||||
)
|
||||
expect(out).toBe("a b c")
|
||||
})
|
||||
})
|
||||
|
||||
describe("process control", () => {
|
||||
test("kills a running process", async () => {
|
||||
const exit = await Effect.runPromiseExit(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sleep", ["10"])
|
||||
yield* handle.kill()
|
||||
return yield* handle.exitCode
|
||||
}),
|
||||
).pipe(Effect.provide(live)),
|
||||
)
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
})
|
||||
|
||||
test("isRunning reflects process state", async () => {
|
||||
await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "echo done"])
|
||||
yield* handle.exitCode
|
||||
const running = yield* handle.isRunning
|
||||
expect(running).toBe(false)
|
||||
}),
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
describe("error handling", () => {
|
||||
test("fails for invalid command", async () => {
|
||||
const exit = await Effect.runPromiseExit(
|
||||
Effect.scoped(ChildProcess.make("nonexistent-command-12345").asEffect()).pipe(Effect.provide(live)),
|
||||
)
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe("pipeline", () => {
|
||||
test("pipes stdout of one command to stdin of another", async () => {
|
||||
const out = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make`echo hello world`.pipe(
|
||||
ChildProcess.pipeTo(ChildProcess.make`tr a-z A-Z`),
|
||||
)
|
||||
const output = yield* decodeByteStream(handle.stdout)
|
||||
yield* handle.exitCode
|
||||
return output
|
||||
}),
|
||||
)
|
||||
expect(out).toBe("HELLO WORLD")
|
||||
})
|
||||
|
||||
test("three-stage pipeline", async () => {
|
||||
const out = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make`echo hello world`.pipe(
|
||||
ChildProcess.pipeTo(ChildProcess.make`tr a-z A-Z`),
|
||||
ChildProcess.pipeTo(ChildProcess.make("tr", [" ", "-"])),
|
||||
)
|
||||
const output = yield* decodeByteStream(handle.stdout)
|
||||
yield* handle.exitCode
|
||||
return output
|
||||
}),
|
||||
)
|
||||
expect(out).toBe("HELLO-WORLD")
|
||||
})
|
||||
|
||||
test("pipes stderr with { from: 'stderr' }", async () => {
|
||||
const out = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "echo error >&2"]).pipe(
|
||||
ChildProcess.pipeTo(ChildProcess.make`cat`, { from: "stderr" }),
|
||||
)
|
||||
const output = yield* decodeByteStream(handle.stdout)
|
||||
yield* handle.exitCode
|
||||
return output
|
||||
}),
|
||||
)
|
||||
expect(out).toBe("error")
|
||||
})
|
||||
})
|
||||
|
||||
describe("additional fds", () => {
|
||||
test("reads data from output fd3", async () => {
|
||||
const out = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "echo 'hello from fd3' >&3"], {
|
||||
additionalFds: { fd3: { type: "output" } },
|
||||
})
|
||||
const fd3Output = yield* decodeByteStream(handle.getOutputFd(3))
|
||||
yield* handle.exitCode
|
||||
return fd3Output
|
||||
}),
|
||||
)
|
||||
expect(out).toBe("hello from fd3")
|
||||
})
|
||||
|
||||
test("writes data to input fd3", async () => {
|
||||
const out = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const inputData = "data from parent"
|
||||
const inputStream = Stream.make(new TextEncoder().encode(inputData))
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "cat <&3"], {
|
||||
additionalFds: { fd3: { type: "input", stream: inputStream } },
|
||||
})
|
||||
const stdout = yield* decodeByteStream(handle.stdout)
|
||||
yield* handle.exitCode
|
||||
return stdout
|
||||
}),
|
||||
)
|
||||
expect(out).toBe("data from parent")
|
||||
})
|
||||
|
||||
test("returns empty stream for unconfigured fd", async () => {
|
||||
const out = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "echo test"])
|
||||
const fd3Output = yield* decodeByteStream(handle.getOutputFd(3))
|
||||
yield* handle.exitCode
|
||||
return fd3Output
|
||||
}),
|
||||
)
|
||||
expect(out).toBe("")
|
||||
})
|
||||
|
||||
test("works alongside normal stdout and stderr", async () => {
|
||||
const result = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "echo 'stdout'; echo 'stderr' >&2; echo 'fd3' >&3"], {
|
||||
additionalFds: { fd3: { type: "output" } },
|
||||
})
|
||||
const stdout = yield* decodeByteStream(handle.stdout)
|
||||
const stderr = yield* decodeByteStream(handle.stderr)
|
||||
const fd3 = yield* decodeByteStream(handle.getOutputFd(3))
|
||||
yield* handle.exitCode
|
||||
return { stdout, stderr, fd3 }
|
||||
}),
|
||||
)
|
||||
expect(result.stdout).toBe("stdout")
|
||||
expect(result.stderr).toBe("stderr")
|
||||
expect(result.fd3).toBe("fd3")
|
||||
})
|
||||
})
|
||||
|
||||
describe("large output", () => {
|
||||
test(
|
||||
"does not deadlock on large stdout",
|
||||
async () => {
|
||||
const out = await runScoped(
|
||||
Effect.gen(function* () {
|
||||
const handle = yield* ChildProcess.make("sh", ["-c", "seq 1 100000"])
|
||||
const output = yield* handle.stdout.pipe(
|
||||
Stream.decodeText(),
|
||||
Stream.runFold(() => "", (acc, chunk) => acc + chunk),
|
||||
)
|
||||
yield* handle.exitCode
|
||||
return output
|
||||
}),
|
||||
)
|
||||
const lines = out.trim().split("\n")
|
||||
expect(lines.length).toBe(100000)
|
||||
expect(lines[0]).toBe("1")
|
||||
expect(lines[99999]).toBe("100000")
|
||||
},
|
||||
{ timeout: 10_000 },
|
||||
)
|
||||
})
|
||||
|
||||
describe("Windows-specific", () => {
|
||||
test("uses shell routing on Windows", async () => {
|
||||
if (process.platform !== "win32") return
|
||||
|
||||
const out = await run(
|
||||
ChildProcessSpawner.ChildProcessSpawner.use((svc) =>
|
||||
svc.string(
|
||||
ChildProcess.make("set", ["OPENCODE_TEST_SHELL"], {
|
||||
shell: true,
|
||||
extendEnv: true,
|
||||
env: { OPENCODE_TEST_SHELL: "ok" },
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
expect(out).toContain("OPENCODE_TEST_SHELL=ok")
|
||||
})
|
||||
|
||||
test("runs cmd scripts with spaces on Windows without shell", async () => {
|
||||
if (process.platform !== "win32") return
|
||||
|
||||
await using tmp = await tmpdir()
|
||||
const dir = path.join(tmp.path, "with space")
|
||||
const file = path.join(dir, "echo cmd.cmd")
|
||||
|
||||
await fs.mkdir(dir, { recursive: true })
|
||||
await Bun.write(file, "@echo off\r\nif %~1==--stdio exit /b 0\r\nexit /b 7\r\n")
|
||||
|
||||
const code = await run(
|
||||
ChildProcessSpawner.ChildProcessSpawner.use((svc) =>
|
||||
svc.exitCode(
|
||||
ChildProcess.make(file, ["--stdio"], {
|
||||
stdin: "pipe",
|
||||
stdout: "pipe",
|
||||
stderr: "pipe",
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
expect(code).toBe(ChildProcessSpawner.ExitCode(0))
|
||||
})
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user