mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-14 09:54:46 +00:00
Compare commits
2 Commits
kit/questi
...
kit/shell-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5a413edc8a | ||
|
|
8f1c6e08a7 |
@@ -3,6 +3,7 @@ import { randomBytes } from "crypto"
|
||||
|
||||
export namespace Identifier {
|
||||
const prefixes = {
|
||||
job: "job",
|
||||
event: "evt",
|
||||
session: "ses",
|
||||
message: "msg",
|
||||
|
||||
301
packages/opencode/src/shell-job/index.ts
Normal file
301
packages/opencode/src/shell-job/index.ts
Normal file
@@ -0,0 +1,301 @@
|
||||
import path from "path"
|
||||
import * as NodeFS from "fs/promises"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { AppFileSystem } from "@/filesystem"
|
||||
import { Shell } from "@/shell/shell"
|
||||
import { Effect, Layer, Scope, Deferred, Stream, Context, Exit, Schema, Struct } from "effect"
|
||||
import { ChildProcess } from "effect/unstable/process"
|
||||
import { ChildProcessSpawner } from "effect/unstable/process/ChildProcessSpawner"
|
||||
import type { ChildProcessHandle } from "effect/unstable/process/ChildProcessSpawner"
|
||||
|
||||
import { JobID } from "./schema"
|
||||
|
||||
const PS = new Set(["powershell", "pwsh"])
|
||||
|
||||
export namespace ShellJob {
|
||||
export const Status = Schema.Literals(["running", "completed", "failed", "killed", "timed_out"])
|
||||
export type Status = Schema.Schema.Type<typeof Status>
|
||||
|
||||
export class Info extends Schema.Class<Info>("ShellJob.Info")({
|
||||
id: JobID,
|
||||
command: Schema.String,
|
||||
cwd: Schema.String,
|
||||
shell: Schema.String,
|
||||
title: Schema.optional(Schema.String),
|
||||
status: Status,
|
||||
pid: Schema.optional(Schema.Number),
|
||||
started_at: Schema.Number,
|
||||
ended_at: Schema.optional(Schema.Number),
|
||||
exit_code: Schema.optional(Schema.NullOr(Schema.Number)),
|
||||
output_path: Schema.String,
|
||||
meta_path: Schema.String,
|
||||
cursor: Schema.Number,
|
||||
}) {}
|
||||
|
||||
export class Output extends Schema.Class<Output>("ShellJob.Output")({
|
||||
text: Schema.String,
|
||||
cursor: Schema.Number,
|
||||
done: Schema.Boolean,
|
||||
}) {}
|
||||
|
||||
export class StartInput extends Schema.Class<StartInput>("ShellJob.StartInput")({
|
||||
command: Schema.String,
|
||||
cwd: Schema.optional(Schema.String),
|
||||
shell: Schema.optional(Schema.String),
|
||||
title: Schema.optional(Schema.String),
|
||||
timeout: Schema.optional(Schema.Number),
|
||||
env: Schema.optional(Schema.Record(Schema.String, Schema.String)),
|
||||
}) {}
|
||||
|
||||
export class WaitInput extends Schema.Class<WaitInput>("ShellJob.WaitInput")({
|
||||
id: JobID,
|
||||
timeout: Schema.optional(Schema.Number),
|
||||
}) {}
|
||||
|
||||
export class OutputInput extends Schema.Class<OutputInput>("ShellJob.OutputInput")({
|
||||
id: JobID,
|
||||
cursor: Schema.optional(Schema.Number),
|
||||
}) {}
|
||||
|
||||
type Active = {
|
||||
info: Struct.Mutable<Info>
|
||||
next: Status | undefined
|
||||
done: Deferred.Deferred<Info>
|
||||
handle: ChildProcessHandle | undefined
|
||||
}
|
||||
|
||||
type State = {
|
||||
dir: string
|
||||
root: string
|
||||
jobs: Map<JobID, Active>
|
||||
scope: Scope.Scope
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
readonly list: () => Effect.Effect<Info[]>
|
||||
readonly get: (id: JobID) => Effect.Effect<Info | undefined>
|
||||
readonly start: (input: StartInput) => Effect.Effect<Info>
|
||||
readonly output: (input: OutputInput) => Effect.Effect<Output | undefined>
|
||||
readonly wait: (input: WaitInput) => Effect.Effect<Info | undefined>
|
||||
readonly kill: (id: JobID) => Effect.Effect<Info | undefined>
|
||||
}
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/ShellJob") {}
|
||||
|
||||
function spawn(shell: string, name: string, command: string, cwd: string, env: NodeJS.ProcessEnv) {
|
||||
if (process.platform === "win32" && PS.has(name)) {
|
||||
return ChildProcess.make(shell, ["-NoLogo", "-NoProfile", "-NonInteractive", "-Command", command], {
|
||||
cwd,
|
||||
env,
|
||||
stdin: "ignore",
|
||||
detached: false,
|
||||
})
|
||||
}
|
||||
|
||||
return ChildProcess.make(command, [], {
|
||||
shell,
|
||||
cwd,
|
||||
env,
|
||||
stdin: "ignore",
|
||||
detached: process.platform !== "win32",
|
||||
})
|
||||
}
|
||||
|
||||
const snap = (job: Active) =>
|
||||
new Info({
|
||||
...job.info,
|
||||
id: String(job.info.id),
|
||||
})
|
||||
|
||||
export const layer: Layer.Layer<Service, never, AppFileSystem.Service | ChildProcessSpawner> = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const spawner = yield* ChildProcessSpawner
|
||||
|
||||
const append = Effect.fn("ShellJob.append")(function* (job: Active, chunk: string) {
|
||||
yield* Effect.tryPromise({
|
||||
try: () => NodeFS.appendFile(job.info.output_path, chunk, "utf8"),
|
||||
catch: () => new Error("Failed to append shell job output"),
|
||||
}).pipe(Effect.orDie)
|
||||
})
|
||||
|
||||
const write = Effect.fn("ShellJob.write")(function* (job: Active) {
|
||||
yield* fs.writeJson(job.info.meta_path, job.info).pipe(Effect.orDie)
|
||||
})
|
||||
|
||||
const end = Effect.fn("ShellJob.end")(function* (job: Active, status: Status, code?: number | null) {
|
||||
if (job.info.status !== "running") return snap(job)
|
||||
job.info.status = status
|
||||
job.info.ended_at = Date.now()
|
||||
job.info.exit_code = code
|
||||
job.handle = undefined
|
||||
job.next = undefined
|
||||
yield* write(job)
|
||||
const info = snap(job)
|
||||
yield* Deferred.succeed(job.done, info).pipe(Effect.ignore)
|
||||
return info
|
||||
})
|
||||
|
||||
const watch = Effect.fn("ShellJob.watch")(function* (job: Active, timeout?: number) {
|
||||
const handle = job.handle
|
||||
if (!handle) return snap(job)
|
||||
|
||||
if (timeout) {
|
||||
yield* Effect.sleep(`${timeout} millis`).pipe(
|
||||
Effect.andThen(
|
||||
Effect.gen(function* () {
|
||||
if (job.info.status !== "running") return
|
||||
job.next = "timed_out"
|
||||
yield* handle.kill({ forceKillAfter: "3 seconds" }).pipe(Effect.ignore)
|
||||
}),
|
||||
),
|
||||
Effect.forkScoped,
|
||||
)
|
||||
}
|
||||
|
||||
yield* Effect.forkScoped(
|
||||
Stream.runForEach(Stream.decodeText(handle.all), (chunk) =>
|
||||
Effect.gen(function* () {
|
||||
job.info.cursor += chunk.length
|
||||
yield* append(job, chunk)
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
const exit = yield* Effect.exit(handle.exitCode)
|
||||
if (Exit.isSuccess(exit)) {
|
||||
const code = Number(exit.value)
|
||||
return yield* end(job, code === 0 ? "completed" : "failed", code)
|
||||
}
|
||||
|
||||
return yield* end(job, job.next ?? "killed", null)
|
||||
})
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("ShellJob.state")(function* (ctx) {
|
||||
const dir = ctx.project.vcs ? ctx.worktree : ctx.directory
|
||||
const root = path.join(dir, ".opencode", "jobs")
|
||||
const state: State = {
|
||||
dir: ctx.directory,
|
||||
root,
|
||||
jobs: new Map(),
|
||||
scope: yield* Scope.Scope,
|
||||
}
|
||||
|
||||
yield* fs.ensureDir(root).pipe(Effect.orDie)
|
||||
yield* Effect.addFinalizer(() =>
|
||||
Effect.gen(function* () {
|
||||
state.jobs.clear()
|
||||
}),
|
||||
)
|
||||
|
||||
return state
|
||||
}),
|
||||
)
|
||||
|
||||
const list: Interface["list"] = Effect.fn("ShellJob.list")(function* () {
|
||||
const s = yield* InstanceState.get(state)
|
||||
return Array.from(s.jobs.values())
|
||||
.map(snap)
|
||||
.toSorted((a, b) => a.started_at - b.started_at)
|
||||
})
|
||||
|
||||
const get: Interface["get"] = Effect.fn("ShellJob.get")(function* (id: JobID) {
|
||||
const s = yield* InstanceState.get(state)
|
||||
const job = s.jobs.get(id)
|
||||
if (!job) return
|
||||
return snap(job)
|
||||
})
|
||||
|
||||
const start: Interface["start"] = Effect.fn("ShellJob.start")(function* (input: StartInput) {
|
||||
const s = yield* InstanceState.get(state)
|
||||
const id = JobID.ascending()
|
||||
const dir = path.join(s.root, String(id))
|
||||
const cwd = input.cwd ?? s.dir
|
||||
const shell = input.shell ?? Shell.acceptable()
|
||||
const name = Shell.name(shell)
|
||||
const handle = yield* Scope.provide(s.scope)(
|
||||
spawner.spawn(
|
||||
spawn(shell, name, input.command, cwd, {
|
||||
...process.env,
|
||||
...input.env,
|
||||
}),
|
||||
),
|
||||
).pipe(Effect.orDie)
|
||||
|
||||
const job: Active = {
|
||||
info: {
|
||||
id,
|
||||
command: input.command,
|
||||
cwd,
|
||||
shell,
|
||||
title: input.title,
|
||||
status: "running",
|
||||
pid: Number(handle.pid),
|
||||
started_at: Date.now(),
|
||||
output_path: path.join(dir, "output.log"),
|
||||
meta_path: path.join(dir, "meta.json"),
|
||||
cursor: 0,
|
||||
} satisfies Struct.Mutable<Info>,
|
||||
next: undefined,
|
||||
done: yield* Deferred.make<Info>(),
|
||||
handle,
|
||||
}
|
||||
|
||||
s.jobs.set(id, job)
|
||||
yield* fs.writeWithDirs(job.info.output_path, "").pipe(Effect.orDie)
|
||||
yield* write(job)
|
||||
yield* Effect.sync(() => {
|
||||
Effect.runFork(Scope.provide(s.scope)(watch(job, input.timeout)))
|
||||
})
|
||||
return snap(job)
|
||||
})
|
||||
|
||||
const output: Interface["output"] = Effect.fn("ShellJob.output")(function* (input: OutputInput) {
|
||||
const s = yield* InstanceState.get(state)
|
||||
const job = s.jobs.get(input.id)
|
||||
if (!job) return
|
||||
const cursor = input.cursor ?? 0
|
||||
const text = yield* fs.readFileString(job.info.output_path).pipe(Effect.catch(() => Effect.succeed("")))
|
||||
return new Output({
|
||||
text: cursor >= text.length ? "" : text.slice(cursor),
|
||||
cursor: text.length,
|
||||
done: job.info.status !== "running",
|
||||
})
|
||||
})
|
||||
|
||||
const wait: Interface["wait"] = Effect.fn("ShellJob.wait")(function* (input: WaitInput) {
|
||||
const s = yield* InstanceState.get(state)
|
||||
const job = s.jobs.get(input.id)
|
||||
if (!job) return
|
||||
if (job.info.status !== "running") return snap(job)
|
||||
if (!input.timeout) return yield* Deferred.await(job.done)
|
||||
return yield* Effect.raceAll([
|
||||
Deferred.await(job.done),
|
||||
Effect.sleep(`${input.timeout} millis`).pipe(Effect.as(snap(job))),
|
||||
])
|
||||
})
|
||||
|
||||
const kill: Interface["kill"] = Effect.fn("ShellJob.kill")(function* (id: JobID) {
|
||||
const s = yield* InstanceState.get(state)
|
||||
const job = s.jobs.get(id)
|
||||
if (!job) return
|
||||
if (job.info.status !== "running") return snap(job)
|
||||
if (!job.handle) return snap(job)
|
||||
if (!job.next) job.next = "killed"
|
||||
yield* job.handle.kill({ forceKillAfter: "3 seconds" }).pipe(Effect.ignore)
|
||||
return yield* Deferred.await(job.done)
|
||||
})
|
||||
|
||||
return Service.of({
|
||||
list,
|
||||
get,
|
||||
start,
|
||||
output,
|
||||
wait,
|
||||
kill,
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
10
packages/opencode/src/shell-job/schema.ts
Normal file
10
packages/opencode/src/shell-job/schema.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
import { Schema } from "effect"
|
||||
|
||||
import { Identifier } from "@/id/id"
|
||||
import { Newtype } from "@/util/schema"
|
||||
|
||||
export class JobID extends Newtype<JobID>()("JobID", Schema.String) {
|
||||
static ascending(id?: string): JobID {
|
||||
return this.make(Identifier.ascending("job", id))
|
||||
}
|
||||
}
|
||||
152
packages/opencode/test/shell-job/shell-job.test.ts
Normal file
152
packages/opencode/test/shell-job/shell-job.test.ts
Normal file
@@ -0,0 +1,152 @@
|
||||
import { describe, expect } from "bun:test"
|
||||
import { Effect, Layer } from "effect"
|
||||
|
||||
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
|
||||
import { AppFileSystem } from "../../src/filesystem"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { Shell } from "../../src/shell/shell"
|
||||
import { ShellJob } from "../../src/shell-job"
|
||||
import { provideTmpdirInstance } from "../fixture/fixture"
|
||||
import { testEffect } from "../lib/effect"
|
||||
|
||||
const it = testEffect(
|
||||
Layer.mergeAll(
|
||||
CrossSpawnSpawner.defaultLayer,
|
||||
ShellJob.layer.pipe(Layer.provide(CrossSpawnSpawner.defaultLayer), Layer.provide(AppFileSystem.defaultLayer)),
|
||||
),
|
||||
)
|
||||
|
||||
const quote = (text: string) => `"${text}"`
|
||||
const squote = (text: string) => `'${text}'`
|
||||
const shell = () => Shell.name(Shell.acceptable())
|
||||
const evalarg = (text: string) => (shell() === "cmd" ? quote(text) : squote(text))
|
||||
const node = (script: string) => {
|
||||
const text = `${quote(process.execPath.replaceAll("\\", "/"))} -e ${evalarg(script)}`
|
||||
if (shell() === "powershell" || shell() === "pwsh") return `& ${text}`
|
||||
return text
|
||||
}
|
||||
|
||||
const alive = (pid: number) => {
|
||||
try {
|
||||
process.kill(pid, 0)
|
||||
return true
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
describe("shell-job", () => {
|
||||
it.live("captures output and persists spool files", () =>
|
||||
provideTmpdirInstance((dir) =>
|
||||
Effect.gen(function* () {
|
||||
const jobs = yield* ShellJob.Service
|
||||
const job = yield* jobs.start({
|
||||
command: node('process.stdout.write("ok")'),
|
||||
cwd: dir,
|
||||
title: "ok",
|
||||
})
|
||||
const done = yield* jobs.wait({ id: job.id })
|
||||
const out = yield* jobs.output({ id: job.id })
|
||||
|
||||
expect(done).toBeDefined()
|
||||
expect(done?.status).toBe("completed")
|
||||
expect(done?.pid).toBeGreaterThan(0)
|
||||
expect(out).toEqual({ text: "ok", cursor: 2, done: true })
|
||||
|
||||
const log = yield* Effect.promise(() => Bun.file(done!.output_path).text())
|
||||
const meta = yield* Effect.promise(() => Bun.file(done!.meta_path).json())
|
||||
expect(log).toBe("ok")
|
||||
expect(meta).toMatchObject({
|
||||
id: done!.id,
|
||||
status: "completed",
|
||||
title: "ok",
|
||||
cursor: 2,
|
||||
})
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
it.live("reads output incrementally with a cursor", () =>
|
||||
provideTmpdirInstance((dir) =>
|
||||
Effect.gen(function* () {
|
||||
const jobs = yield* ShellJob.Service
|
||||
const job = yield* jobs.start({
|
||||
command: node(
|
||||
'process.stdout.write("a"); setTimeout(() => process.stdout.write("b"), 200); setTimeout(() => process.exit(0), 350)',
|
||||
),
|
||||
cwd: dir,
|
||||
})
|
||||
|
||||
yield* Effect.sleep("100 millis")
|
||||
const a = yield* jobs.output({ id: job.id })
|
||||
const done = yield* jobs.wait({ id: job.id })
|
||||
const b = yield* jobs.output({ id: job.id, cursor: a?.cursor ?? 0 })
|
||||
|
||||
expect(a).toEqual({ text: "a", cursor: 1, done: false })
|
||||
expect(done?.status).toBe("completed")
|
||||
expect(b).toEqual({ text: "b", cursor: 2, done: true })
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
it.live("marks non-zero exits as failed", () =>
|
||||
provideTmpdirInstance((dir) =>
|
||||
Effect.gen(function* () {
|
||||
const jobs = yield* ShellJob.Service
|
||||
const job = yield* jobs.start({
|
||||
command: node('process.stderr.write("bad"); process.exit(7)'),
|
||||
cwd: dir,
|
||||
})
|
||||
const done = yield* jobs.wait({ id: job.id })
|
||||
const out = yield* jobs.output({ id: job.id })
|
||||
|
||||
expect(done).toBeDefined()
|
||||
expect(done?.status).toBe("failed")
|
||||
expect(done?.exit_code).toBe(7)
|
||||
expect(out?.text).toBe("bad")
|
||||
expect(out?.done).toBe(true)
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
it.live("kills a running job and returns final state", () =>
|
||||
provideTmpdirInstance((dir) =>
|
||||
Effect.gen(function* () {
|
||||
const jobs = yield* ShellJob.Service
|
||||
const job = yield* jobs.start({
|
||||
command: node("setInterval(() => {}, 1000)"),
|
||||
cwd: dir,
|
||||
})
|
||||
|
||||
yield* Effect.sleep("50 millis")
|
||||
const done = yield* jobs.kill(job.id)
|
||||
|
||||
expect(done).toBeDefined()
|
||||
expect(done?.status).toBe("killed")
|
||||
expect(done?.exit_code).toBeNull()
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
it.live("kills running jobs when the instance is disposed", () => {
|
||||
if (process.platform === "win32") return Effect.void
|
||||
|
||||
return provideTmpdirInstance((dir) =>
|
||||
Effect.gen(function* () {
|
||||
const jobs = yield* ShellJob.Service
|
||||
const job = yield* jobs.start({
|
||||
command: node("setInterval(() => {}, 1000)"),
|
||||
cwd: dir,
|
||||
})
|
||||
|
||||
expect(job.pid).toBeGreaterThan(0)
|
||||
yield* Effect.sleep("50 millis")
|
||||
expect(alive(job.pid!)).toBe(true)
|
||||
|
||||
yield* Effect.promise(() => Instance.dispose())
|
||||
yield* Effect.sleep("100 millis")
|
||||
expect(alive(job.pid!)).toBe(false)
|
||||
}),
|
||||
)
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user