mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-01 02:06:41 +00:00
refactor(ripgrep): use embedded wasm backend (#21703)
This commit is contained in:
@@ -1,28 +1,16 @@
|
||||
// Ripgrep utility functions
|
||||
import path from "path"
|
||||
import { Global } from "../global"
|
||||
import fs from "fs/promises"
|
||||
import path from "path"
|
||||
import { fileURLToPath } from "url"
|
||||
import z from "zod"
|
||||
import { Effect, Layer, Context, Schema } from "effect"
|
||||
import * as Stream from "effect/Stream"
|
||||
import { ChildProcess } from "effect/unstable/process"
|
||||
import { ChildProcessSpawner } from "effect/unstable/process/ChildProcessSpawner"
|
||||
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
||||
import type { PlatformError } from "effect/PlatformError"
|
||||
import { NamedError } from "@opencode-ai/util/error"
|
||||
import { lazy } from "../util/lazy"
|
||||
|
||||
import { Filesystem } from "../util/filesystem"
|
||||
import { AppFileSystem } from "../filesystem"
|
||||
import { Process } from "../util/process"
|
||||
import { which } from "../util/which"
|
||||
import { text } from "node:stream/consumers"
|
||||
|
||||
import { ZipReader, BlobReader, BlobWriter } from "@zip.js/zip.js"
|
||||
import { Cause, Context, Effect, Layer, Queue, Stream } from "effect"
|
||||
import { ripgrep } from "ripgrep"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { Filesystem } from "@/util/filesystem"
|
||||
import { Log } from "@/util/log"
|
||||
|
||||
export namespace Ripgrep {
|
||||
const log = Log.create({ service: "ripgrep" })
|
||||
|
||||
const Stats = z.object({
|
||||
elapsed: z.object({
|
||||
secs: z.number(),
|
||||
@@ -94,437 +82,503 @@ export namespace Ripgrep {
|
||||
|
||||
const Result = z.union([Begin, Match, End, Summary])
|
||||
|
||||
const Hit = Schema.Struct({
|
||||
type: Schema.Literal("match"),
|
||||
data: Schema.Struct({
|
||||
path: Schema.Struct({
|
||||
text: Schema.String,
|
||||
}),
|
||||
lines: Schema.Struct({
|
||||
text: Schema.String,
|
||||
}),
|
||||
line_number: Schema.Number,
|
||||
absolute_offset: Schema.Number,
|
||||
submatches: Schema.mutable(
|
||||
Schema.Array(
|
||||
Schema.Struct({
|
||||
match: Schema.Struct({
|
||||
text: Schema.String,
|
||||
}),
|
||||
start: Schema.Number,
|
||||
end: Schema.Number,
|
||||
}),
|
||||
),
|
||||
),
|
||||
}),
|
||||
})
|
||||
|
||||
const Row = Schema.Union([
|
||||
Schema.Struct({ type: Schema.Literal("begin"), data: Schema.Unknown }),
|
||||
Hit,
|
||||
Schema.Struct({ type: Schema.Literal("end"), data: Schema.Unknown }),
|
||||
Schema.Struct({ type: Schema.Literal("summary"), data: Schema.Unknown }),
|
||||
])
|
||||
|
||||
const decode = Schema.decodeUnknownEffect(Schema.fromJsonString(Row))
|
||||
|
||||
export type Result = z.infer<typeof Result>
|
||||
export type Match = z.infer<typeof Match>
|
||||
export type Item = Match["data"]
|
||||
export type Begin = z.infer<typeof Begin>
|
||||
export type End = z.infer<typeof End>
|
||||
export type Summary = z.infer<typeof Summary>
|
||||
const PLATFORM = {
|
||||
"arm64-darwin": { platform: "aarch64-apple-darwin", extension: "tar.gz" },
|
||||
"arm64-linux": {
|
||||
platform: "aarch64-unknown-linux-gnu",
|
||||
extension: "tar.gz",
|
||||
},
|
||||
"x64-darwin": { platform: "x86_64-apple-darwin", extension: "tar.gz" },
|
||||
"x64-linux": { platform: "x86_64-unknown-linux-musl", extension: "tar.gz" },
|
||||
"arm64-win32": { platform: "aarch64-pc-windows-msvc", extension: "zip" },
|
||||
"x64-win32": { platform: "x86_64-pc-windows-msvc", extension: "zip" },
|
||||
} as const
|
||||
export type Row = Match["data"]
|
||||
|
||||
export const ExtractionFailedError = NamedError.create(
|
||||
"RipgrepExtractionFailedError",
|
||||
z.object({
|
||||
filepath: z.string(),
|
||||
stderr: z.string(),
|
||||
}),
|
||||
)
|
||||
|
||||
export const UnsupportedPlatformError = NamedError.create(
|
||||
"RipgrepUnsupportedPlatformError",
|
||||
z.object({
|
||||
platform: z.string(),
|
||||
}),
|
||||
)
|
||||
|
||||
export const DownloadFailedError = NamedError.create(
|
||||
"RipgrepDownloadFailedError",
|
||||
z.object({
|
||||
url: z.string(),
|
||||
status: z.number(),
|
||||
}),
|
||||
)
|
||||
|
||||
const state = lazy(async () => {
|
||||
const system = which("rg")
|
||||
if (system) {
|
||||
const stat = await fs.stat(system).catch(() => undefined)
|
||||
if (stat?.isFile()) return { filepath: system }
|
||||
log.warn("bun.which returned invalid rg path", { filepath: system })
|
||||
}
|
||||
const filepath = path.join(Global.Path.bin, "rg" + (process.platform === "win32" ? ".exe" : ""))
|
||||
|
||||
if (!(await Filesystem.exists(filepath))) {
|
||||
const platformKey = `${process.arch}-${process.platform}` as keyof typeof PLATFORM
|
||||
const config = PLATFORM[platformKey]
|
||||
if (!config) throw new UnsupportedPlatformError({ platform: platformKey })
|
||||
|
||||
const version = "14.1.1"
|
||||
const filename = `ripgrep-${version}-${config.platform}.${config.extension}`
|
||||
const url = `https://github.com/BurntSushi/ripgrep/releases/download/${version}/${filename}`
|
||||
|
||||
const response = await fetch(url)
|
||||
if (!response.ok) throw new DownloadFailedError({ url, status: response.status })
|
||||
|
||||
const arrayBuffer = await response.arrayBuffer()
|
||||
const archivePath = path.join(Global.Path.bin, filename)
|
||||
await Filesystem.write(archivePath, Buffer.from(arrayBuffer))
|
||||
if (config.extension === "tar.gz") {
|
||||
const args = ["tar", "-xzf", archivePath, "--strip-components=1"]
|
||||
|
||||
if (platformKey.endsWith("-darwin")) args.push("--include=*/rg")
|
||||
if (platformKey.endsWith("-linux")) args.push("--wildcards", "*/rg")
|
||||
|
||||
const proc = Process.spawn(args, {
|
||||
cwd: Global.Path.bin,
|
||||
stderr: "pipe",
|
||||
stdout: "pipe",
|
||||
})
|
||||
const exit = await proc.exited
|
||||
if (exit !== 0) {
|
||||
const stderr = proc.stderr ? await text(proc.stderr) : ""
|
||||
throw new ExtractionFailedError({
|
||||
filepath,
|
||||
stderr,
|
||||
})
|
||||
}
|
||||
}
|
||||
if (config.extension === "zip") {
|
||||
const zipFileReader = new ZipReader(new BlobReader(new Blob([arrayBuffer])))
|
||||
const entries = await zipFileReader.getEntries()
|
||||
let rgEntry: any
|
||||
for (const entry of entries) {
|
||||
if (entry.filename.endsWith("rg.exe")) {
|
||||
rgEntry = entry
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (!rgEntry) {
|
||||
throw new ExtractionFailedError({
|
||||
filepath: archivePath,
|
||||
stderr: "rg.exe not found in zip archive",
|
||||
})
|
||||
}
|
||||
|
||||
const rgBlob = await rgEntry.getData(new BlobWriter())
|
||||
if (!rgBlob) {
|
||||
throw new ExtractionFailedError({
|
||||
filepath: archivePath,
|
||||
stderr: "Failed to extract rg.exe from zip archive",
|
||||
})
|
||||
}
|
||||
await Filesystem.write(filepath, Buffer.from(await rgBlob.arrayBuffer()))
|
||||
await zipFileReader.close()
|
||||
}
|
||||
await fs.unlink(archivePath)
|
||||
if (!platformKey.endsWith("-win32")) await fs.chmod(filepath, 0o755)
|
||||
}
|
||||
|
||||
return {
|
||||
filepath,
|
||||
}
|
||||
})
|
||||
|
||||
export async function filepath() {
|
||||
const { filepath } = await state()
|
||||
return filepath
|
||||
export interface SearchResult {
|
||||
items: Item[]
|
||||
partial: boolean
|
||||
}
|
||||
|
||||
export async function* files(input: {
|
||||
export interface FilesInput {
|
||||
cwd: string
|
||||
glob?: string[]
|
||||
hidden?: boolean
|
||||
follow?: boolean
|
||||
maxDepth?: number
|
||||
signal?: AbortSignal
|
||||
}) {
|
||||
input.signal?.throwIfAborted()
|
||||
}
|
||||
|
||||
const args = [await filepath(), "--files", "--glob=!.git/*"]
|
||||
if (input.follow) args.push("--follow")
|
||||
if (input.hidden !== false) args.push("--hidden")
|
||||
if (input.maxDepth !== undefined) args.push(`--max-depth=${input.maxDepth}`)
|
||||
if (input.glob) {
|
||||
for (const g of input.glob) {
|
||||
args.push(`--glob=${g}`)
|
||||
}
|
||||
}
|
||||
export interface SearchInput {
|
||||
cwd: string
|
||||
pattern: string
|
||||
glob?: string[]
|
||||
limit?: number
|
||||
follow?: boolean
|
||||
file?: string[]
|
||||
signal?: AbortSignal
|
||||
}
|
||||
|
||||
// Guard against invalid cwd to provide a consistent ENOENT error.
|
||||
if (!(await fs.stat(input.cwd).catch(() => undefined))?.isDirectory()) {
|
||||
throw Object.assign(new Error(`No such file or directory: '${input.cwd}'`), {
|
||||
code: "ENOENT",
|
||||
errno: -2,
|
||||
path: input.cwd,
|
||||
})
|
||||
}
|
||||
|
||||
const proc = Process.spawn(args, {
|
||||
cwd: input.cwd,
|
||||
stdout: "pipe",
|
||||
stderr: "ignore",
|
||||
abort: input.signal,
|
||||
})
|
||||
|
||||
if (!proc.stdout) {
|
||||
throw new Error("Process output not available")
|
||||
}
|
||||
|
||||
let buffer = ""
|
||||
const stream = proc.stdout as AsyncIterable<Buffer | string>
|
||||
for await (const chunk of stream) {
|
||||
input.signal?.throwIfAborted()
|
||||
|
||||
buffer += typeof chunk === "string" ? chunk : chunk.toString()
|
||||
// Handle both Unix (\n) and Windows (\r\n) line endings
|
||||
const lines = buffer.split(/\r?\n/)
|
||||
buffer = lines.pop() || ""
|
||||
|
||||
for (const line of lines) {
|
||||
if (line) yield line
|
||||
}
|
||||
}
|
||||
|
||||
if (buffer) yield buffer
|
||||
await proc.exited
|
||||
|
||||
input.signal?.throwIfAborted()
|
||||
export interface TreeInput {
|
||||
cwd: string
|
||||
limit?: number
|
||||
signal?: AbortSignal
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
readonly files: (input: {
|
||||
cwd: string
|
||||
glob?: string[]
|
||||
hidden?: boolean
|
||||
follow?: boolean
|
||||
maxDepth?: number
|
||||
}) => Stream.Stream<string, PlatformError>
|
||||
readonly search: (input: {
|
||||
cwd: string
|
||||
pattern: string
|
||||
glob?: string[]
|
||||
limit?: number
|
||||
follow?: boolean
|
||||
file?: string[]
|
||||
}) => Effect.Effect<{ items: Item[]; partial: boolean }, PlatformError | Error>
|
||||
readonly files: (input: FilesInput) => Stream.Stream<string, Error>
|
||||
readonly tree: (input: TreeInput) => Effect.Effect<string, Error>
|
||||
readonly search: (input: SearchInput) => Effect.Effect<SearchResult, Error>
|
||||
}
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/Ripgrep") {}
|
||||
|
||||
export const layer: Layer.Layer<Service, never, ChildProcessSpawner | AppFileSystem.Service> = Layer.effect(
|
||||
type Run = { kind: "files" | "search"; cwd: string; args: string[] }
|
||||
|
||||
type WorkerResult = {
|
||||
type: "result"
|
||||
code: number
|
||||
stdout: string
|
||||
stderr: string
|
||||
}
|
||||
|
||||
type WorkerLine = {
|
||||
type: "line"
|
||||
line: string
|
||||
}
|
||||
|
||||
type WorkerDone = {
|
||||
type: "done"
|
||||
code: number
|
||||
stderr: string
|
||||
}
|
||||
|
||||
type WorkerError = {
|
||||
type: "error"
|
||||
error: {
|
||||
message: string
|
||||
name?: string
|
||||
stack?: string
|
||||
}
|
||||
}
|
||||
|
||||
function env() {
|
||||
const env = Object.fromEntries(
|
||||
Object.entries(process.env).filter((item): item is [string, string] => item[1] !== undefined),
|
||||
)
|
||||
delete env.RIPGREP_CONFIG_PATH
|
||||
return env
|
||||
}
|
||||
|
||||
function text(input: unknown) {
|
||||
if (typeof input === "string") return input
|
||||
if (input instanceof ArrayBuffer) return Buffer.from(input).toString()
|
||||
if (ArrayBuffer.isView(input)) return Buffer.from(input.buffer, input.byteOffset, input.byteLength).toString()
|
||||
return String(input)
|
||||
}
|
||||
|
||||
function toError(input: unknown) {
|
||||
if (input instanceof Error) return input
|
||||
if (typeof input === "string") return new Error(input)
|
||||
return new Error(String(input))
|
||||
}
|
||||
|
||||
function abort(signal?: AbortSignal) {
|
||||
const err = signal?.reason
|
||||
if (err instanceof Error) return err
|
||||
const out = new Error("Aborted")
|
||||
out.name = "AbortError"
|
||||
return out
|
||||
}
|
||||
|
||||
function error(stderr: string, code: number) {
|
||||
const err = new Error(stderr.trim() || `ripgrep failed with code ${code}`)
|
||||
err.name = "RipgrepError"
|
||||
return err
|
||||
}
|
||||
|
||||
function clean(file: string) {
|
||||
return path.normalize(file.replace(/^\.[\\/]/, ""))
|
||||
}
|
||||
|
||||
function row(data: Row): Row {
|
||||
return {
|
||||
...data,
|
||||
path: {
|
||||
...data.path,
|
||||
text: clean(data.path.text),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
function opts(cwd: string) {
|
||||
return {
|
||||
env: env(),
|
||||
preopens: { ".": cwd },
|
||||
}
|
||||
}
|
||||
|
||||
function check(cwd: string) {
|
||||
return Effect.tryPromise({
|
||||
try: () => fs.stat(cwd).catch(() => undefined),
|
||||
catch: toError,
|
||||
}).pipe(
|
||||
Effect.flatMap((stat) =>
|
||||
stat?.isDirectory()
|
||||
? Effect.void
|
||||
: Effect.fail(
|
||||
Object.assign(new Error(`No such file or directory: '${cwd}'`), {
|
||||
code: "ENOENT",
|
||||
errno: -2,
|
||||
path: cwd,
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
function filesArgs(input: FilesInput) {
|
||||
const args = ["--files", "--glob=!.git/*"]
|
||||
if (input.follow) args.push("--follow")
|
||||
if (input.hidden !== false) args.push("--hidden")
|
||||
if (input.maxDepth !== undefined) args.push(`--max-depth=${input.maxDepth}`)
|
||||
if (input.glob) {
|
||||
for (const glob of input.glob) {
|
||||
args.push(`--glob=${glob}`)
|
||||
}
|
||||
}
|
||||
args.push(".")
|
||||
return args
|
||||
}
|
||||
|
||||
function searchArgs(input: SearchInput) {
|
||||
const args = ["--json", "--hidden", "--glob=!.git/*", "--no-messages"]
|
||||
if (input.follow) args.push("--follow")
|
||||
if (input.glob) {
|
||||
for (const glob of input.glob) {
|
||||
args.push(`--glob=${glob}`)
|
||||
}
|
||||
}
|
||||
if (input.limit) args.push(`--max-count=${input.limit}`)
|
||||
args.push("--", input.pattern, ...(input.file ?? ["."]))
|
||||
return args
|
||||
}
|
||||
|
||||
function parse(stdout: string) {
|
||||
return stdout
|
||||
.trim()
|
||||
.split(/\r?\n/)
|
||||
.filter(Boolean)
|
||||
.map((line) => Result.parse(JSON.parse(line)))
|
||||
.flatMap((item) => (item.type === "match" ? [row(item.data)] : []))
|
||||
}
|
||||
|
||||
function target() {
|
||||
const js = new URL("./ripgrep.worker.js", import.meta.url)
|
||||
return Effect.tryPromise({
|
||||
try: () => Filesystem.exists(fileURLToPath(js)),
|
||||
catch: toError,
|
||||
}).pipe(Effect.map((exists) => (exists ? js : new URL("./ripgrep.worker.ts", import.meta.url))))
|
||||
}
|
||||
|
||||
function worker() {
|
||||
return target().pipe(Effect.flatMap((file) => Effect.sync(() => new Worker(file, { env: env() }))))
|
||||
}
|
||||
|
||||
function drain(buf: string, chunk: unknown, push: (line: string) => void) {
|
||||
const lines = (buf + text(chunk)).split(/\r?\n/)
|
||||
buf = lines.pop() || ""
|
||||
for (const line of lines) {
|
||||
if (line) push(line)
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
function fail(queue: Queue.Queue<string, Error | Cause.Done>, err: Error) {
|
||||
Queue.failCauseUnsafe(queue, Cause.fail(err))
|
||||
}
|
||||
|
||||
function searchDirect(input: SearchInput) {
|
||||
return Effect.tryPromise({
|
||||
try: () =>
|
||||
ripgrep(searchArgs(input), {
|
||||
buffer: true,
|
||||
...opts(input.cwd),
|
||||
}),
|
||||
catch: toError,
|
||||
}).pipe(
|
||||
Effect.flatMap((ret) => {
|
||||
const out = ret.stdout ?? ""
|
||||
if (ret.code !== 0 && ret.code !== 1 && ret.code !== 2) {
|
||||
return Effect.fail(error(ret.stderr ?? "", ret.code ?? 1))
|
||||
}
|
||||
return Effect.sync(() => ({
|
||||
items: ret.code === 1 ? [] : parse(out),
|
||||
partial: ret.code === 2,
|
||||
}))
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
function searchWorker(input: SearchInput) {
|
||||
if (input.signal?.aborted) return Effect.fail(abort(input.signal))
|
||||
|
||||
return Effect.acquireUseRelease(
|
||||
worker(),
|
||||
(w) =>
|
||||
Effect.callback<SearchResult, Error>((resume, signal) => {
|
||||
let open = true
|
||||
const done = (effect: Effect.Effect<SearchResult, Error>) => {
|
||||
if (!open) return
|
||||
open = false
|
||||
resume(effect)
|
||||
}
|
||||
const onabort = () => done(Effect.fail(abort(input.signal)))
|
||||
|
||||
w.onerror = (evt) => {
|
||||
done(Effect.fail(toError(evt.error ?? evt.message)))
|
||||
}
|
||||
w.onmessage = (evt: MessageEvent<WorkerResult | WorkerError>) => {
|
||||
const msg = evt.data
|
||||
if (msg.type === "error") {
|
||||
done(Effect.fail(Object.assign(new Error(msg.error.message), msg.error)))
|
||||
return
|
||||
}
|
||||
if (msg.code === 1) {
|
||||
done(Effect.succeed({ items: [], partial: false }))
|
||||
return
|
||||
}
|
||||
if (msg.code !== 0 && msg.code !== 1 && msg.code !== 2) {
|
||||
done(Effect.fail(error(msg.stderr, msg.code)))
|
||||
return
|
||||
}
|
||||
done(
|
||||
Effect.sync(() => ({
|
||||
items: parse(msg.stdout),
|
||||
partial: msg.code === 2,
|
||||
})),
|
||||
)
|
||||
}
|
||||
|
||||
input.signal?.addEventListener("abort", onabort, { once: true })
|
||||
signal.addEventListener("abort", onabort, { once: true })
|
||||
w.postMessage({
|
||||
kind: "search",
|
||||
cwd: input.cwd,
|
||||
args: searchArgs(input),
|
||||
} satisfies Run)
|
||||
|
||||
return Effect.sync(() => {
|
||||
input.signal?.removeEventListener("abort", onabort)
|
||||
signal.removeEventListener("abort", onabort)
|
||||
w.onerror = null
|
||||
w.onmessage = null
|
||||
})
|
||||
}),
|
||||
(w) => Effect.sync(() => w.terminate()),
|
||||
)
|
||||
}
|
||||
|
||||
function filesDirect(input: FilesInput) {
|
||||
return Stream.callback<string, Error>(
|
||||
Effect.fnUntraced(function* (queue: Queue.Queue<string, Error | Cause.Done>) {
|
||||
let buf = ""
|
||||
let err = ""
|
||||
|
||||
const out = {
|
||||
write(chunk: unknown) {
|
||||
buf = drain(buf, chunk, (line) => {
|
||||
Queue.offerUnsafe(queue, clean(line))
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
const stderr = {
|
||||
write(chunk: unknown) {
|
||||
err += text(chunk)
|
||||
},
|
||||
}
|
||||
|
||||
yield* Effect.forkScoped(
|
||||
Effect.gen(function* () {
|
||||
yield* check(input.cwd)
|
||||
const ret = yield* Effect.tryPromise({
|
||||
try: () =>
|
||||
ripgrep(filesArgs(input), {
|
||||
stdout: out,
|
||||
stderr,
|
||||
...opts(input.cwd),
|
||||
}),
|
||||
catch: toError,
|
||||
})
|
||||
if (buf) Queue.offerUnsafe(queue, clean(buf))
|
||||
if (ret.code === 0 || ret.code === 1) {
|
||||
Queue.endUnsafe(queue)
|
||||
return
|
||||
}
|
||||
fail(queue, error(err, ret.code ?? 1))
|
||||
}).pipe(
|
||||
Effect.catch((err) =>
|
||||
Effect.sync(() => {
|
||||
fail(queue, err)
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
function filesWorker(input: FilesInput) {
|
||||
return Stream.callback<string, Error>(
|
||||
Effect.fnUntraced(function* (queue: Queue.Queue<string, Error | Cause.Done>) {
|
||||
if (input.signal?.aborted) {
|
||||
fail(queue, abort(input.signal))
|
||||
return
|
||||
}
|
||||
|
||||
const w = yield* Effect.acquireRelease(worker(), (w) => Effect.sync(() => w.terminate()))
|
||||
let open = true
|
||||
const close = () => {
|
||||
if (!open) return false
|
||||
open = false
|
||||
return true
|
||||
}
|
||||
const onabort = () => {
|
||||
if (!close()) return
|
||||
fail(queue, abort(input.signal))
|
||||
}
|
||||
|
||||
w.onerror = (evt) => {
|
||||
if (!close()) return
|
||||
fail(queue, toError(evt.error ?? evt.message))
|
||||
}
|
||||
w.onmessage = (evt: MessageEvent<WorkerLine | WorkerDone | WorkerError>) => {
|
||||
const msg = evt.data
|
||||
if (msg.type === "line") {
|
||||
if (open) Queue.offerUnsafe(queue, msg.line)
|
||||
return
|
||||
}
|
||||
if (!close()) return
|
||||
if (msg.type === "error") {
|
||||
fail(queue, Object.assign(new Error(msg.error.message), msg.error))
|
||||
return
|
||||
}
|
||||
if (msg.code === 0 || msg.code === 1) {
|
||||
Queue.endUnsafe(queue)
|
||||
return
|
||||
}
|
||||
fail(queue, error(msg.stderr, msg.code))
|
||||
}
|
||||
|
||||
yield* Effect.acquireRelease(
|
||||
Effect.sync(() => {
|
||||
input.signal?.addEventListener("abort", onabort, { once: true })
|
||||
w.postMessage({
|
||||
kind: "files",
|
||||
cwd: input.cwd,
|
||||
args: filesArgs(input),
|
||||
} satisfies Run)
|
||||
}),
|
||||
() =>
|
||||
Effect.sync(() => {
|
||||
input.signal?.removeEventListener("abort", onabort)
|
||||
w.onerror = null
|
||||
w.onmessage = null
|
||||
}),
|
||||
)
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const spawner = yield* ChildProcessSpawner
|
||||
const afs = yield* AppFileSystem.Service
|
||||
const bin = Effect.fn("Ripgrep.path")(function* () {
|
||||
return yield* Effect.promise(() => filepath())
|
||||
})
|
||||
const args = Effect.fn("Ripgrep.args")(function* (input: {
|
||||
mode: "files" | "search"
|
||||
glob?: string[]
|
||||
hidden?: boolean
|
||||
follow?: boolean
|
||||
maxDepth?: number
|
||||
limit?: number
|
||||
pattern?: string
|
||||
file?: string[]
|
||||
}) {
|
||||
const out = [yield* bin(), input.mode === "search" ? "--json" : "--files", "--glob=!.git/*"]
|
||||
if (input.follow) out.push("--follow")
|
||||
if (input.hidden !== false) out.push("--hidden")
|
||||
if (input.maxDepth !== undefined) out.push(`--max-depth=${input.maxDepth}`)
|
||||
if (input.glob) {
|
||||
for (const g of input.glob) {
|
||||
out.push(`--glob=${g}`)
|
||||
const source = (input: FilesInput) => {
|
||||
const useWorker = !!input.signal && typeof Worker !== "undefined"
|
||||
if (!useWorker && input.signal) {
|
||||
log.warn("worker unavailable, ripgrep abort disabled")
|
||||
}
|
||||
return useWorker ? filesWorker(input) : filesDirect(input)
|
||||
}
|
||||
|
||||
const files: Interface["files"] = (input) => source(input)
|
||||
|
||||
const tree: Interface["tree"] = Effect.fn("Ripgrep.tree")(function* (input: TreeInput) {
|
||||
log.info("tree", input)
|
||||
const list = Array.from(yield* source({ cwd: input.cwd, signal: input.signal }).pipe(Stream.runCollect))
|
||||
|
||||
interface Node {
|
||||
name: string
|
||||
children: Map<string, Node>
|
||||
}
|
||||
|
||||
function child(node: Node, name: string) {
|
||||
const item = node.children.get(name)
|
||||
if (item) return item
|
||||
const next = { name, children: new Map() }
|
||||
node.children.set(name, next)
|
||||
return next
|
||||
}
|
||||
|
||||
function count(node: Node): number {
|
||||
return Array.from(node.children.values()).reduce((sum, child) => sum + 1 + count(child), 0)
|
||||
}
|
||||
|
||||
const root: Node = { name: "", children: new Map() }
|
||||
for (const file of list) {
|
||||
if (file.includes(".opencode")) continue
|
||||
const parts = file.split(path.sep)
|
||||
if (parts.length < 2) continue
|
||||
let node = root
|
||||
for (const part of parts.slice(0, -1)) {
|
||||
node = child(node, part)
|
||||
}
|
||||
}
|
||||
if (input.limit) out.push(`--max-count=${input.limit}`)
|
||||
if (input.mode === "search") out.push("--no-messages")
|
||||
if (input.pattern) out.push("--", input.pattern, ...(input.file ?? []))
|
||||
return out
|
||||
})
|
||||
|
||||
const files = Effect.fn("Ripgrep.files")(function* (input: {
|
||||
cwd: string
|
||||
glob?: string[]
|
||||
hidden?: boolean
|
||||
follow?: boolean
|
||||
maxDepth?: number
|
||||
}) {
|
||||
const rgPath = yield* bin()
|
||||
const isDir = yield* afs.isDir(input.cwd)
|
||||
if (!isDir) {
|
||||
return yield* Effect.die(
|
||||
Object.assign(new Error(`No such file or directory: '${input.cwd}'`), {
|
||||
code: "ENOENT" as const,
|
||||
errno: -2,
|
||||
path: input.cwd,
|
||||
}),
|
||||
const total = count(root)
|
||||
const limit = input.limit ?? total
|
||||
const lines: string[] = []
|
||||
const queue: Array<{ node: Node; path: string }> = Array.from(root.children.values())
|
||||
.sort((a, b) => a.name.localeCompare(b.name))
|
||||
.map((node) => ({ node, path: node.name }))
|
||||
|
||||
let used = 0
|
||||
for (let i = 0; i < queue.length && used < limit; i++) {
|
||||
const item = queue[i]
|
||||
lines.push(item.path)
|
||||
used++
|
||||
queue.push(
|
||||
...Array.from(item.node.children.values())
|
||||
.sort((a, b) => a.name.localeCompare(b.name))
|
||||
.map((node) => ({ node, path: `${item.path}/${node.name}` })),
|
||||
)
|
||||
}
|
||||
|
||||
const cmd = yield* args({
|
||||
mode: "files",
|
||||
glob: input.glob,
|
||||
hidden: input.hidden,
|
||||
follow: input.follow,
|
||||
maxDepth: input.maxDepth,
|
||||
})
|
||||
|
||||
return spawner
|
||||
.streamLines(ChildProcess.make(cmd[0], cmd.slice(1), { cwd: input.cwd }))
|
||||
.pipe(Stream.filter((line: string) => line.length > 0))
|
||||
if (total > used) lines.push(`[${total - used} truncated]`)
|
||||
return lines.join("\n")
|
||||
})
|
||||
|
||||
const search = Effect.fn("Ripgrep.search")(function* (input: {
|
||||
cwd: string
|
||||
pattern: string
|
||||
glob?: string[]
|
||||
limit?: number
|
||||
follow?: boolean
|
||||
file?: string[]
|
||||
}) {
|
||||
return yield* Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const cmd = yield* args({
|
||||
mode: "search",
|
||||
glob: input.glob,
|
||||
follow: input.follow,
|
||||
limit: input.limit,
|
||||
pattern: input.pattern,
|
||||
file: input.file,
|
||||
})
|
||||
|
||||
const handle = yield* spawner.spawn(
|
||||
ChildProcess.make(cmd[0], cmd.slice(1), {
|
||||
cwd: input.cwd,
|
||||
stdin: "ignore",
|
||||
}),
|
||||
)
|
||||
|
||||
const [items, stderr, code] = yield* Effect.all(
|
||||
[
|
||||
Stream.decodeText(handle.stdout).pipe(
|
||||
Stream.splitLines,
|
||||
Stream.filter((line) => line.length > 0),
|
||||
Stream.mapEffect((line) =>
|
||||
decode(line).pipe(Effect.mapError((cause) => new Error("invalid ripgrep output", { cause }))),
|
||||
),
|
||||
Stream.filter((row): row is Schema.Schema.Type<typeof Hit> => row.type === "match"),
|
||||
Stream.map((row): Item => row.data),
|
||||
Stream.runCollect,
|
||||
Effect.map((chunk) => [...chunk]),
|
||||
),
|
||||
Stream.mkString(Stream.decodeText(handle.stderr)),
|
||||
handle.exitCode,
|
||||
],
|
||||
{ concurrency: "unbounded" },
|
||||
)
|
||||
|
||||
if (code !== 0 && code !== 1 && code !== 2) {
|
||||
return yield* Effect.fail(new Error(`ripgrep failed: ${stderr}`))
|
||||
}
|
||||
|
||||
return {
|
||||
items,
|
||||
partial: code === 2,
|
||||
}
|
||||
}),
|
||||
)
|
||||
const search: Interface["search"] = Effect.fn("Ripgrep.search")(function* (input: SearchInput) {
|
||||
const useWorker = !!input.signal && typeof Worker !== "undefined"
|
||||
if (!useWorker && input.signal) {
|
||||
log.warn("worker unavailable, ripgrep abort disabled")
|
||||
}
|
||||
return yield* useWorker ? searchWorker(input) : searchDirect(input)
|
||||
})
|
||||
|
||||
return Service.of({
|
||||
files: (input) => Stream.unwrap(files(input)),
|
||||
search,
|
||||
})
|
||||
return Service.of({ files, tree, search })
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(
|
||||
Layer.provide(AppFileSystem.defaultLayer),
|
||||
Layer.provide(CrossSpawnSpawner.defaultLayer),
|
||||
)
|
||||
export const defaultLayer = layer
|
||||
|
||||
export async function tree(input: { cwd: string; limit?: number; signal?: AbortSignal }) {
|
||||
log.info("tree", input)
|
||||
const files = await Array.fromAsync(Ripgrep.files({ cwd: input.cwd, signal: input.signal }))
|
||||
interface Node {
|
||||
name: string
|
||||
children: Map<string, Node>
|
||||
}
|
||||
const { runPromise } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
function dir(node: Node, name: string) {
|
||||
const existing = node.children.get(name)
|
||||
if (existing) return existing
|
||||
const next = { name, children: new Map() }
|
||||
node.children.set(name, next)
|
||||
return next
|
||||
}
|
||||
export function files(input: FilesInput) {
|
||||
return runPromise((svc) => Stream.toAsyncIterableEffect(svc.files(input)))
|
||||
}
|
||||
|
||||
const root: Node = { name: "", children: new Map() }
|
||||
for (const file of files) {
|
||||
if (file.includes(".opencode")) continue
|
||||
const parts = file.split(path.sep)
|
||||
if (parts.length < 2) continue
|
||||
let node = root
|
||||
for (const part of parts.slice(0, -1)) {
|
||||
node = dir(node, part)
|
||||
}
|
||||
}
|
||||
export function tree(input: TreeInput) {
|
||||
return runPromise((svc) => svc.tree(input))
|
||||
}
|
||||
|
||||
function count(node: Node): number {
|
||||
let total = 0
|
||||
for (const child of node.children.values()) {
|
||||
total += 1 + count(child)
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
const total = count(root)
|
||||
const limit = input.limit ?? total
|
||||
const lines: string[] = []
|
||||
const queue: { node: Node; path: string }[] = []
|
||||
for (const child of Array.from(root.children.values()).sort((a, b) => a.name.localeCompare(b.name))) {
|
||||
queue.push({ node: child, path: child.name })
|
||||
}
|
||||
|
||||
let used = 0
|
||||
for (let i = 0; i < queue.length && used < limit; i++) {
|
||||
const { node, path } = queue[i]
|
||||
lines.push(path)
|
||||
used++
|
||||
for (const child of Array.from(node.children.values()).sort((a, b) => a.name.localeCompare(b.name))) {
|
||||
queue.push({ node: child, path: `${path}/${child.name}` })
|
||||
}
|
||||
}
|
||||
|
||||
if (total > used) lines.push(`[${total - used} truncated]`)
|
||||
|
||||
return lines.join("\n")
|
||||
export function search(input: SearchInput) {
|
||||
return runPromise((svc) => svc.search(input))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user