Compare commits

...

2 Commits

Author SHA1 Message Date
Kit Langton
5a413edc8a test(shell-job): make shell command fixtures portable 2026-04-13 19:28:21 -04:00
Kit Langton
8f1c6e08a7 feat(shell-job): add isolated shell job service 2026-04-13 19:17:07 -04:00
4 changed files with 464 additions and 0 deletions

View File

@@ -3,6 +3,7 @@ import { randomBytes } from "crypto"
export namespace Identifier {
const prefixes = {
job: "job",
event: "evt",
session: "ses",
message: "msg",

View File

@@ -0,0 +1,301 @@
import path from "path"
import * as NodeFS from "fs/promises"
import { InstanceState } from "@/effect/instance-state"
import { AppFileSystem } from "@/filesystem"
import { Shell } from "@/shell/shell"
import { Effect, Layer, Scope, Deferred, Stream, Context, Exit, Schema, Struct } from "effect"
import { ChildProcess } from "effect/unstable/process"
import { ChildProcessSpawner } from "effect/unstable/process/ChildProcessSpawner"
import type { ChildProcessHandle } from "effect/unstable/process/ChildProcessSpawner"
import { JobID } from "./schema"
const PS = new Set(["powershell", "pwsh"])
export namespace ShellJob {
export const Status = Schema.Literals(["running", "completed", "failed", "killed", "timed_out"])
export type Status = Schema.Schema.Type<typeof Status>
export class Info extends Schema.Class<Info>("ShellJob.Info")({
id: JobID,
command: Schema.String,
cwd: Schema.String,
shell: Schema.String,
title: Schema.optional(Schema.String),
status: Status,
pid: Schema.optional(Schema.Number),
started_at: Schema.Number,
ended_at: Schema.optional(Schema.Number),
exit_code: Schema.optional(Schema.NullOr(Schema.Number)),
output_path: Schema.String,
meta_path: Schema.String,
cursor: Schema.Number,
}) {}
export class Output extends Schema.Class<Output>("ShellJob.Output")({
text: Schema.String,
cursor: Schema.Number,
done: Schema.Boolean,
}) {}
export class StartInput extends Schema.Class<StartInput>("ShellJob.StartInput")({
command: Schema.String,
cwd: Schema.optional(Schema.String),
shell: Schema.optional(Schema.String),
title: Schema.optional(Schema.String),
timeout: Schema.optional(Schema.Number),
env: Schema.optional(Schema.Record(Schema.String, Schema.String)),
}) {}
export class WaitInput extends Schema.Class<WaitInput>("ShellJob.WaitInput")({
id: JobID,
timeout: Schema.optional(Schema.Number),
}) {}
export class OutputInput extends Schema.Class<OutputInput>("ShellJob.OutputInput")({
id: JobID,
cursor: Schema.optional(Schema.Number),
}) {}
type Active = {
info: Struct.Mutable<Info>
next: Status | undefined
done: Deferred.Deferred<Info>
handle: ChildProcessHandle | undefined
}
type State = {
dir: string
root: string
jobs: Map<JobID, Active>
scope: Scope.Scope
}
export interface Interface {
readonly list: () => Effect.Effect<Info[]>
readonly get: (id: JobID) => Effect.Effect<Info | undefined>
readonly start: (input: StartInput) => Effect.Effect<Info>
readonly output: (input: OutputInput) => Effect.Effect<Output | undefined>
readonly wait: (input: WaitInput) => Effect.Effect<Info | undefined>
readonly kill: (id: JobID) => Effect.Effect<Info | undefined>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/ShellJob") {}
function spawn(shell: string, name: string, command: string, cwd: string, env: NodeJS.ProcessEnv) {
if (process.platform === "win32" && PS.has(name)) {
return ChildProcess.make(shell, ["-NoLogo", "-NoProfile", "-NonInteractive", "-Command", command], {
cwd,
env,
stdin: "ignore",
detached: false,
})
}
return ChildProcess.make(command, [], {
shell,
cwd,
env,
stdin: "ignore",
detached: process.platform !== "win32",
})
}
const snap = (job: Active) =>
new Info({
...job.info,
id: String(job.info.id),
})
export const layer: Layer.Layer<Service, never, AppFileSystem.Service | ChildProcessSpawner> = Layer.effect(
Service,
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const spawner = yield* ChildProcessSpawner
const append = Effect.fn("ShellJob.append")(function* (job: Active, chunk: string) {
yield* Effect.tryPromise({
try: () => NodeFS.appendFile(job.info.output_path, chunk, "utf8"),
catch: () => new Error("Failed to append shell job output"),
}).pipe(Effect.orDie)
})
const write = Effect.fn("ShellJob.write")(function* (job: Active) {
yield* fs.writeJson(job.info.meta_path, job.info).pipe(Effect.orDie)
})
const end = Effect.fn("ShellJob.end")(function* (job: Active, status: Status, code?: number | null) {
if (job.info.status !== "running") return snap(job)
job.info.status = status
job.info.ended_at = Date.now()
job.info.exit_code = code
job.handle = undefined
job.next = undefined
yield* write(job)
const info = snap(job)
yield* Deferred.succeed(job.done, info).pipe(Effect.ignore)
return info
})
const watch = Effect.fn("ShellJob.watch")(function* (job: Active, timeout?: number) {
const handle = job.handle
if (!handle) return snap(job)
if (timeout) {
yield* Effect.sleep(`${timeout} millis`).pipe(
Effect.andThen(
Effect.gen(function* () {
if (job.info.status !== "running") return
job.next = "timed_out"
yield* handle.kill({ forceKillAfter: "3 seconds" }).pipe(Effect.ignore)
}),
),
Effect.forkScoped,
)
}
yield* Effect.forkScoped(
Stream.runForEach(Stream.decodeText(handle.all), (chunk) =>
Effect.gen(function* () {
job.info.cursor += chunk.length
yield* append(job, chunk)
}),
),
)
const exit = yield* Effect.exit(handle.exitCode)
if (Exit.isSuccess(exit)) {
const code = Number(exit.value)
return yield* end(job, code === 0 ? "completed" : "failed", code)
}
return yield* end(job, job.next ?? "killed", null)
})
const state = yield* InstanceState.make<State>(
Effect.fn("ShellJob.state")(function* (ctx) {
const dir = ctx.project.vcs ? ctx.worktree : ctx.directory
const root = path.join(dir, ".opencode", "jobs")
const state: State = {
dir: ctx.directory,
root,
jobs: new Map(),
scope: yield* Scope.Scope,
}
yield* fs.ensureDir(root).pipe(Effect.orDie)
yield* Effect.addFinalizer(() =>
Effect.gen(function* () {
state.jobs.clear()
}),
)
return state
}),
)
const list: Interface["list"] = Effect.fn("ShellJob.list")(function* () {
const s = yield* InstanceState.get(state)
return Array.from(s.jobs.values())
.map(snap)
.toSorted((a, b) => a.started_at - b.started_at)
})
const get: Interface["get"] = Effect.fn("ShellJob.get")(function* (id: JobID) {
const s = yield* InstanceState.get(state)
const job = s.jobs.get(id)
if (!job) return
return snap(job)
})
const start: Interface["start"] = Effect.fn("ShellJob.start")(function* (input: StartInput) {
const s = yield* InstanceState.get(state)
const id = JobID.ascending()
const dir = path.join(s.root, String(id))
const cwd = input.cwd ?? s.dir
const shell = input.shell ?? Shell.acceptable()
const name = Shell.name(shell)
const handle = yield* Scope.provide(s.scope)(
spawner.spawn(
spawn(shell, name, input.command, cwd, {
...process.env,
...input.env,
}),
),
).pipe(Effect.orDie)
const job: Active = {
info: {
id,
command: input.command,
cwd,
shell,
title: input.title,
status: "running",
pid: Number(handle.pid),
started_at: Date.now(),
output_path: path.join(dir, "output.log"),
meta_path: path.join(dir, "meta.json"),
cursor: 0,
} satisfies Struct.Mutable<Info>,
next: undefined,
done: yield* Deferred.make<Info>(),
handle,
}
s.jobs.set(id, job)
yield* fs.writeWithDirs(job.info.output_path, "").pipe(Effect.orDie)
yield* write(job)
yield* Effect.sync(() => {
Effect.runFork(Scope.provide(s.scope)(watch(job, input.timeout)))
})
return snap(job)
})
const output: Interface["output"] = Effect.fn("ShellJob.output")(function* (input: OutputInput) {
const s = yield* InstanceState.get(state)
const job = s.jobs.get(input.id)
if (!job) return
const cursor = input.cursor ?? 0
const text = yield* fs.readFileString(job.info.output_path).pipe(Effect.catch(() => Effect.succeed("")))
return new Output({
text: cursor >= text.length ? "" : text.slice(cursor),
cursor: text.length,
done: job.info.status !== "running",
})
})
const wait: Interface["wait"] = Effect.fn("ShellJob.wait")(function* (input: WaitInput) {
const s = yield* InstanceState.get(state)
const job = s.jobs.get(input.id)
if (!job) return
if (job.info.status !== "running") return snap(job)
if (!input.timeout) return yield* Deferred.await(job.done)
return yield* Effect.raceAll([
Deferred.await(job.done),
Effect.sleep(`${input.timeout} millis`).pipe(Effect.as(snap(job))),
])
})
const kill: Interface["kill"] = Effect.fn("ShellJob.kill")(function* (id: JobID) {
const s = yield* InstanceState.get(state)
const job = s.jobs.get(id)
if (!job) return
if (job.info.status !== "running") return snap(job)
if (!job.handle) return snap(job)
if (!job.next) job.next = "killed"
yield* job.handle.kill({ forceKillAfter: "3 seconds" }).pipe(Effect.ignore)
return yield* Deferred.await(job.done)
})
return Service.of({
list,
get,
start,
output,
wait,
kill,
})
}),
)
}

View File

@@ -0,0 +1,10 @@
import { Schema } from "effect"
import { Identifier } from "@/id/id"
import { Newtype } from "@/util/schema"
export class JobID extends Newtype<JobID>()("JobID", Schema.String) {
static ascending(id?: string): JobID {
return this.make(Identifier.ascending("job", id))
}
}

View File

@@ -0,0 +1,152 @@
import { describe, expect } from "bun:test"
import { Effect, Layer } from "effect"
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
import { AppFileSystem } from "../../src/filesystem"
import { Instance } from "../../src/project/instance"
import { Shell } from "../../src/shell/shell"
import { ShellJob } from "../../src/shell-job"
import { provideTmpdirInstance } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
const it = testEffect(
Layer.mergeAll(
CrossSpawnSpawner.defaultLayer,
ShellJob.layer.pipe(Layer.provide(CrossSpawnSpawner.defaultLayer), Layer.provide(AppFileSystem.defaultLayer)),
),
)
const quote = (text: string) => `"${text}"`
const squote = (text: string) => `'${text}'`
const shell = () => Shell.name(Shell.acceptable())
const evalarg = (text: string) => (shell() === "cmd" ? quote(text) : squote(text))
const node = (script: string) => {
const text = `${quote(process.execPath.replaceAll("\\", "/"))} -e ${evalarg(script)}`
if (shell() === "powershell" || shell() === "pwsh") return `& ${text}`
return text
}
const alive = (pid: number) => {
try {
process.kill(pid, 0)
return true
} catch {
return false
}
}
describe("shell-job", () => {
it.live("captures output and persists spool files", () =>
provideTmpdirInstance((dir) =>
Effect.gen(function* () {
const jobs = yield* ShellJob.Service
const job = yield* jobs.start({
command: node('process.stdout.write("ok")'),
cwd: dir,
title: "ok",
})
const done = yield* jobs.wait({ id: job.id })
const out = yield* jobs.output({ id: job.id })
expect(done).toBeDefined()
expect(done?.status).toBe("completed")
expect(done?.pid).toBeGreaterThan(0)
expect(out).toEqual({ text: "ok", cursor: 2, done: true })
const log = yield* Effect.promise(() => Bun.file(done!.output_path).text())
const meta = yield* Effect.promise(() => Bun.file(done!.meta_path).json())
expect(log).toBe("ok")
expect(meta).toMatchObject({
id: done!.id,
status: "completed",
title: "ok",
cursor: 2,
})
}),
),
)
it.live("reads output incrementally with a cursor", () =>
provideTmpdirInstance((dir) =>
Effect.gen(function* () {
const jobs = yield* ShellJob.Service
const job = yield* jobs.start({
command: node(
'process.stdout.write("a"); setTimeout(() => process.stdout.write("b"), 200); setTimeout(() => process.exit(0), 350)',
),
cwd: dir,
})
yield* Effect.sleep("100 millis")
const a = yield* jobs.output({ id: job.id })
const done = yield* jobs.wait({ id: job.id })
const b = yield* jobs.output({ id: job.id, cursor: a?.cursor ?? 0 })
expect(a).toEqual({ text: "a", cursor: 1, done: false })
expect(done?.status).toBe("completed")
expect(b).toEqual({ text: "b", cursor: 2, done: true })
}),
),
)
it.live("marks non-zero exits as failed", () =>
provideTmpdirInstance((dir) =>
Effect.gen(function* () {
const jobs = yield* ShellJob.Service
const job = yield* jobs.start({
command: node('process.stderr.write("bad"); process.exit(7)'),
cwd: dir,
})
const done = yield* jobs.wait({ id: job.id })
const out = yield* jobs.output({ id: job.id })
expect(done).toBeDefined()
expect(done?.status).toBe("failed")
expect(done?.exit_code).toBe(7)
expect(out?.text).toBe("bad")
expect(out?.done).toBe(true)
}),
),
)
it.live("kills a running job and returns final state", () =>
provideTmpdirInstance((dir) =>
Effect.gen(function* () {
const jobs = yield* ShellJob.Service
const job = yield* jobs.start({
command: node("setInterval(() => {}, 1000)"),
cwd: dir,
})
yield* Effect.sleep("50 millis")
const done = yield* jobs.kill(job.id)
expect(done).toBeDefined()
expect(done?.status).toBe("killed")
expect(done?.exit_code).toBeNull()
}),
),
)
it.live("kills running jobs when the instance is disposed", () => {
if (process.platform === "win32") return Effect.void
return provideTmpdirInstance((dir) =>
Effect.gen(function* () {
const jobs = yield* ShellJob.Service
const job = yield* jobs.start({
command: node("setInterval(() => {}, 1000)"),
cwd: dir,
})
expect(job.pid).toBeGreaterThan(0)
yield* Effect.sleep("50 millis")
expect(alive(job.pid!)).toBe(true)
yield* Effect.promise(() => Instance.dispose())
yield* Effect.sleep("100 millis")
expect(alive(job.pid!)).toBe(false)
}),
)
})
})