Compare commits

...

18 Commits

Author SHA1 Message Date
Kit Langton
958a3e5b2f refactor(file-time): effectify FileTimeService with Semaphore locks
Convert FileTime from Instance.state to Effect ServiceMap.Service.
Replace hand-rolled Promise lock chain with SynchronizedRef<Map> +
Semaphore.withPermits for per-file mutual exclusion. Legacy facade
preserved — callers unchanged.
2026-03-15 15:11:23 -04:00
Kit Langton
e9535666a4 test(file-time): add tests for FileTime read/assert/withLock
7 tests covering: timestamp recording, session scoping, assert on
unread/unchanged/modified files, and withLock serialization.
All pass against the current Instance.state implementation.
2026-03-15 14:16:33 -04:00
Kit Langton
8c566cb764 refactor(vcs): effectify VcsService as scoped service
Convert Vcs from Instance.state namespace to an Effect ServiceMap.Service
on the Instances LayerMap. Uses Instance.bind for the Bus.subscribe
callback (same ALS pattern as FileWatcherService). Branch state is
managed in the layer closure with scope-based cleanup.
2026-03-15 14:05:57 -04:00
Kit Langton
388c173b56 fix(vcs): invert HEAD filter so branch changes are detected
The condition was `endsWith("HEAD")` (skip HEAD events) instead of
`!endsWith("HEAD")` (only process HEAD events). Since the git watcher
only watches HEAD, this meant BranchUpdated never fired.
2026-03-15 13:42:20 -04:00
Kit Langton
3984bd061e test(vcs): add tests for branch detection and HEAD event publishing
Two tests pass (branch() reads current branch correctly).
Two tests fail: BranchUpdated never fires because the FileWatcher
event filter in vcs.ts has an inverted condition — it skips HEAD
changes instead of filtering for them.
2026-03-15 13:39:55 -04:00
Kit Langton
3701344cce Merge branch 'effectify-watcher' into effectify-file-watcher-service 2026-03-15 13:38:48 -04:00
Kit Langton
701d6c4374 style: format 2026-03-15 13:34:20 -04:00
Kit Langton
f5c81a1343 style: format watcher test 2026-03-15 13:31:51 -04:00
Kit Langton
247bd3d0db fix(watcher): clean up late-arriving subscriptions on timeout
When w.subscribe() takes longer than SUBSCRIBE_TIMEOUT_MS and
eventually resolves, the subscription was left active and unmanaged.
Now the catchCause handler explicitly unsubscribes it, matching
the old withTimeout cleanup behavior.
2026-03-15 13:25:33 -04:00
Kit Langton
0f4df1a1b3 fix(test): remove flaky 100ms race in permission deny test
The deny path is synchronous once the Effect runs, but the
ManagedRuntime can take >100ms to start on slow CI runners.
Just await the result directly — bun's 30s test timeout is
the backstop if it ever hangs.
2026-03-15 12:45:10 -04:00
Kit Langton
59cf7e5848 fix: simplify watcher code and test types
- Use property shorthand `{ init }` in catchCause fallback
- Make BusUpdate.directory required (non-optional)
- Use Effect.runSync instead of Effect.runFork in synchronous callback
2026-03-15 12:37:44 -04:00
Kit Langton
330f315d20 test: stabilize watcher and pty event timing
Probe both root and git watcher readiness before asserting events so the watcher tests stop racing startup. Also make the PTY ordering test use a deterministic short-lived process and keep native PTY callbacks bound to the instance context.
2026-03-15 12:28:26 -04:00
Kit Langton
3e532b2ac8 debug: log experimental filewatcher flag value in CI 2026-03-15 11:52:11 -04:00
Kit Langton
cea3f2c924 docs: add Effect migration patterns to AGENTS.md
Document Instance-scoped services, Instance.bind for ALS context
in native callbacks, and the Flag → Effect.Config migration pattern.
2026-03-15 11:42:47 -04:00
Kit Langton
cb5372c4e8 refactor(watcher): read flags via Effect.Config instead of Flag namespace
Use Effect.Config.boolean with ConfigProvider (defaults to env) so
tests can override flags via ConfigProvider.layer without module
mocking. Removes mock.module that was poisoning Flag getters and
causing cross-test pollution in config/instruction tests.
2026-03-15 11:13:27 -04:00
Kit Langton
4b92f9c55e feat(instance): add Instance.bind for ALS context capture
Add Instance.bind(fn) which captures the current instance ALS context
and returns a wrapper that synchronously restores it when called. This
prevents silent failures when callbacks fire outside the instance async
context (native addons, event emitters, timers).

Use it in FileWatcherService to bind the @parcel/watcher callback,
replacing the async Instance.provide workaround.
2026-03-15 09:45:15 -04:00
Kit Langton
2cbd2a5a2d fix(watcher): restore ALS context in native watcher callback
The @parcel/watcher callback fires from a native C++ uv_async handle
which does not preserve Node.js AsyncLocalStorage context. Wrap the
callback in Instance.provide to restore the instance ALS context so
Bus.publish can resolve instance-scoped state and directory.
2026-03-15 09:39:13 -04:00
Kit Langton
c35dc2245a refactor(watcher): effectify FileWatcher as scoped service
Convert FileWatcher from Instance.state namespace to an Effect
ServiceMap.Service with proper scope-based lifecycle management.
2026-03-14 23:39:31 -04:00
18 changed files with 907 additions and 629 deletions

View File

@@ -34,6 +34,7 @@ Instructions to follow when writing Effect.
- Use `Effect.gen(function* () { ... })` for composition.
- Use `Effect.fn("ServiceName.method")` for named/traced effects and `Effect.fnUntraced` for internal helpers.
- `Effect.fn` / `Effect.fnUntraced` accept pipeable operators as extra arguments, so avoid unnecessary `flow` or outer `.pipe()` wrappers.
- **`Effect.callback`** (not `Effect.async`) for callback-based APIs. The classic `Effect.async` was renamed to `Effect.callback` in effect-smol/v4.
## Time
@@ -42,3 +43,37 @@ Instructions to follow when writing Effect.
## Errors
- In `Effect.gen/fn`, prefer `yield* new MyError(...)` over `yield* Effect.fail(new MyError(...))` for direct early-failure branches.
## Instance-scoped Effect services
Services that need per-directory lifecycle (created/destroyed per instance) go through the `Instances` LayerMap:
1. Define a `ServiceMap.Service` with a `static readonly layer` (see `FileWatcherService`, `QuestionService`, `PermissionService`, `ProviderAuthService`).
2. Add it to `InstanceServices` union and `Layer.mergeAll(...)` in `src/effect/instances.ts`.
3. Use `InstanceContext` inside the layer to read `directory` and `project` instead of `Instance.*` globals.
4. Call from legacy code via `runPromiseInstance(MyService.use((s) => s.method()))`.
### Instance.bind — ALS context for native callbacks
`Instance.bind(fn)` captures the current Instance AsyncLocalStorage context and returns a wrapper that restores it synchronously when called.
**Use it** when passing callbacks to native C/C++ addons (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish`, `Instance.state()`, or anything that reads `Instance.directory`.
**Don't need it** for `setTimeout`, `Promise.then`, `EventEmitter.on`, or Effect fibers — Node.js ALS propagates through those automatically.
```typescript
// Native addon callback — needs Instance.bind
const cb = Instance.bind((err, evts) => {
Bus.publish(MyEvent, { ... })
})
nativeAddon.subscribe(dir, cb)
```
## Flag → Effect.Config migration
Flags in `src/flag/flag.ts` are being migrated from static `truthy(...)` reads to `Config.boolean(...).pipe(Config.withDefault(false))` as their consumers get effectified.
- Effectful flags return `Config<boolean>` and are read with `yield*` inside `Effect.gen`.
- The default `ConfigProvider` reads from `process.env`, so env vars keep working.
- Tests can override via `ConfigProvider.layer(ConfigProvider.fromUnknown({ ... }))`.
- Keep all flags in `flag.ts` as the single registry — just change the implementation from `truthy()` to `Config.boolean()` when the consumer moves to Effect.

View File

@@ -0,0 +1,13 @@
import { ServiceMap } from "effect"
import type { Project } from "@/project/project"
export declare namespace InstanceContext {
export interface Shape {
readonly directory: string
readonly project: Project.Info
}
}
export class InstanceContext extends ServiceMap.Service<InstanceContext, InstanceContext.Shape>()(
"opencode/InstanceContext",
) {}

View File

@@ -1,23 +1,23 @@
import { Effect, Layer, LayerMap, ServiceMap } from "effect"
import { registerDisposer } from "./instance-registry"
import { InstanceContext } from "./instance-context"
import { ProviderAuthService } from "@/provider/auth-service"
import { QuestionService } from "@/question/service"
import { PermissionService } from "@/permission/service"
import { FileWatcherService } from "@/file/watcher"
import { VcsService } from "@/project/vcs"
import { FileTimeService } from "@/file/time"
import { Instance } from "@/project/instance"
import type { Project } from "@/project/project"
export declare namespace InstanceContext {
export interface Shape {
readonly directory: string
readonly project: Project.Info
}
}
export { InstanceContext } from "./instance-context"
export class InstanceContext extends ServiceMap.Service<InstanceContext, InstanceContext.Shape>()(
"opencode/InstanceContext",
) {}
export type InstanceServices = QuestionService | PermissionService | ProviderAuthService
export type InstanceServices =
| QuestionService
| PermissionService
| ProviderAuthService
| FileWatcherService
| VcsService
| FileTimeService
function lookup(directory: string) {
const project = Instance.project
@@ -26,7 +26,10 @@ function lookup(directory: string) {
Layer.fresh(QuestionService.layer),
Layer.fresh(PermissionService.layer),
Layer.fresh(ProviderAuthService.layer),
).pipe(Layer.provide(ctx))
Layer.fresh(FileWatcherService.layer),
Layer.fresh(VcsService.layer),
Layer.fresh(FileTimeService.layer),
).pipe(Layer.provide(ctx), Layer.orDie)
}
export class Instances extends ServiceMap.Service<Instances, LayerMap.LayerMap<string, InstanceServices>>()(

View File

@@ -1,71 +1,88 @@
import { Instance } from "../project/instance"
import { Log } from "../util/log"
import { Flag } from "../flag/flag"
import { Flag } from "@/flag/flag"
import { Filesystem } from "../util/filesystem"
import { Effect, Layer, ServiceMap, Semaphore } from "effect"
import { runPromiseInstance } from "@/effect/runtime"
export namespace FileTime {
const log = Log.create({ service: "file.time" })
// Per-session read times plus per-file write locks.
// All tools that overwrite existing files should run their
// assert/read/write/update sequence inside withLock(filepath, ...)
// so concurrent writes to the same file are serialized.
export const state = Instance.state(() => {
const read: {
[sessionID: string]: {
[path: string]: Date | undefined
const log = Log.create({ service: "file.time" })
export namespace FileTimeService {
export interface Service {
readonly read: (sessionID: string, file: string) => Effect.Effect<void>
readonly get: (sessionID: string, file: string) => Effect.Effect<Date | undefined>
readonly assert: (sessionID: string, filepath: string) => Effect.Effect<void>
readonly withLock: <T>(filepath: string, fn: () => Promise<T>) => Effect.Effect<T>
}
}
export class FileTimeService extends ServiceMap.Service<FileTimeService, FileTimeService.Service>()(
"@opencode/FileTime",
) {
static readonly layer = Layer.effect(
FileTimeService,
Effect.gen(function* () {
const disableCheck = yield* Flag.OPENCODE_DISABLE_FILETIME_CHECK
const reads: { [sessionID: string]: { [path: string]: Date | undefined } } = {}
const locks = new Map<string, Semaphore.Semaphore>()
function getLock(filepath: string) {
let lock = locks.get(filepath)
if (!lock) {
lock = Semaphore.makeUnsafe(1)
locks.set(filepath, lock)
}
return lock
}
} = {}
const locks = new Map<string, Promise<void>>()
return {
read,
locks,
}
})
return FileTimeService.of({
read: Effect.fn("FileTimeService.read")(function* (sessionID: string, file: string) {
log.info("read", { sessionID, file })
reads[sessionID] = reads[sessionID] || {}
reads[sessionID][file] = new Date()
}),
get: Effect.fn("FileTimeService.get")(function* (sessionID: string, file: string) {
return reads[sessionID]?.[file]
}),
assert: Effect.fn("FileTimeService.assert")(function* (sessionID: string, filepath: string) {
if (disableCheck) return
const time = reads[sessionID]?.[filepath]
if (!time) throw new Error(`You must read file ${filepath} before overwriting it. Use the Read tool first`)
const mtime = Filesystem.stat(filepath)?.mtime
if (mtime && mtime.getTime() > time.getTime() + 50) {
throw new Error(
`File ${filepath} has been modified since it was last read.\nLast modification: ${mtime.toISOString()}\nLast read: ${time.toISOString()}\n\nPlease read the file again before modifying it.`,
)
}
}),
withLock: Effect.fn("FileTimeService.withLock")(function* <T>(filepath: string, fn: () => Promise<T>) {
const lock = getLock(filepath)
return yield* Effect.promise(fn).pipe(lock.withPermits(1))
}),
})
}),
)
}
// Legacy facade — callers don't need to change
export namespace FileTime {
export function read(sessionID: string, file: string) {
log.info("read", { sessionID, file })
const { read } = state()
read[sessionID] = read[sessionID] || {}
read[sessionID][file] = new Date()
// Fire-and-forget — callers never await this
runPromiseInstance(FileTimeService.use((s) => s.read(sessionID, file)))
}
export function get(sessionID: string, file: string) {
return state().read[sessionID]?.[file]
}
export async function withLock<T>(filepath: string, fn: () => Promise<T>): Promise<T> {
const current = state()
const currentLock = current.locks.get(filepath) ?? Promise.resolve()
let release: () => void = () => {}
const nextLock = new Promise<void>((resolve) => {
release = resolve
})
const chained = currentLock.then(() => nextLock)
current.locks.set(filepath, chained)
await currentLock
try {
return await fn()
} finally {
release()
if (current.locks.get(filepath) === chained) {
current.locks.delete(filepath)
}
}
return runPromiseInstance(FileTimeService.use((s) => s.get(sessionID, file)))
}
export async function assert(sessionID: string, filepath: string) {
if (Flag.OPENCODE_DISABLE_FILETIME_CHECK === true) {
return
}
return runPromiseInstance(FileTimeService.use((s) => s.assert(sessionID, filepath)))
}
const time = get(sessionID, filepath)
if (!time) throw new Error(`You must read file ${filepath} before overwriting it. Use the Read tool first`)
const mtime = Filesystem.stat(filepath)?.mtime
// Allow a 50ms tolerance for Windows NTFS timestamp fuzziness / async flushing
if (mtime && mtime.getTime() > time.getTime() + 50) {
throw new Error(
`File ${filepath} has been modified since it was last read.\nLast modification: ${mtime.toISOString()}\nLast read: ${time.toISOString()}\n\nPlease read the file again before modifying it.`,
)
}
export async function withLock<T>(filepath: string, fn: () => Promise<T>): Promise<T> {
return runPromiseInstance(FileTimeService.use((s) => s.withLock(filepath, fn)))
}
}

View File

@@ -1,7 +1,8 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { InstanceContext } from "@/effect/instance-context"
import { Instance } from "@/project/instance"
import z from "zod"
import { Instance } from "../project/instance"
import { Log } from "../util/log"
import { FileIgnore } from "./ignore"
import { Config } from "../config/config"
@@ -9,118 +10,139 @@ import path from "path"
// @ts-ignore
import { createWrapper } from "@parcel/watcher/wrapper"
import { lazy } from "@/util/lazy"
import { withTimeout } from "@/util/timeout"
import type ParcelWatcher from "@parcel/watcher"
import { Flag } from "@/flag/flag"
import { readdir } from "fs/promises"
import { git } from "@/util/git"
import { Protected } from "./protected"
import { Flag } from "@/flag/flag"
import { Cause, Effect, Layer, ServiceMap } from "effect"
const SUBSCRIBE_TIMEOUT_MS = 10_000
declare const OPENCODE_LIBC: string | undefined
export namespace FileWatcher {
const log = Log.create({ service: "file.watcher" })
const log = Log.create({ service: "file.watcher" })
export const Event = {
Updated: BusEvent.define(
"file.watcher.updated",
z.object({
file: z.string(),
event: z.union([z.literal("add"), z.literal("change"), z.literal("unlink")]),
}),
),
const event = {
Updated: BusEvent.define(
"file.watcher.updated",
z.object({
file: z.string(),
event: z.union([z.literal("add"), z.literal("change"), z.literal("unlink")]),
}),
),
}
const watcher = lazy((): typeof import("@parcel/watcher") | undefined => {
try {
const binding = require(
`@parcel/watcher-${process.platform}-${process.arch}${process.platform === "linux" ? `-${OPENCODE_LIBC || "glibc"}` : ""}`,
)
return createWrapper(binding) as typeof import("@parcel/watcher")
} catch (error) {
log.error("failed to load watcher binding", { error })
return
}
})
const watcher = lazy((): typeof import("@parcel/watcher") | undefined => {
try {
const binding = require(
`@parcel/watcher-${process.platform}-${process.arch}${process.platform === "linux" ? `-${OPENCODE_LIBC || "glibc"}` : ""}`,
)
return createWrapper(binding) as typeof import("@parcel/watcher")
} catch (error) {
log.error("failed to load watcher binding", { error })
return
}
})
function getBackend() {
if (process.platform === "win32") return "windows"
if (process.platform === "darwin") return "fs-events"
if (process.platform === "linux") return "inotify"
}
const state = Instance.state(
async () => {
log.info("init")
const cfg = await Config.get()
const backend = (() => {
if (process.platform === "win32") return "windows"
if (process.platform === "darwin") return "fs-events"
if (process.platform === "linux") return "inotify"
})()
if (!backend) {
log.error("watcher backend not supported", { platform: process.platform })
return {}
}
log.info("watcher backend", { platform: process.platform, backend })
export namespace FileWatcher {
export const Event = event
/** Whether the native @parcel/watcher binding is available on this platform. */
export const hasNativeBinding = () => !!watcher()
}
const w = watcher()
if (!w) return {}
const init = Effect.fn("FileWatcherService.init")(function* () {})
const subscribe: ParcelWatcher.SubscribeCallback = (err, evts) => {
if (err) return
for (const evt of evts) {
if (evt.type === "create") Bus.publish(Event.Updated, { file: evt.path, event: "add" })
if (evt.type === "update") Bus.publish(Event.Updated, { file: evt.path, event: "change" })
if (evt.type === "delete") Bus.publish(Event.Updated, { file: evt.path, event: "unlink" })
}
}
const subs: ParcelWatcher.AsyncSubscription[] = []
const cfgIgnores = cfg.watcher?.ignore ?? []
if (Flag.OPENCODE_EXPERIMENTAL_FILEWATCHER) {
const pending = w.subscribe(Instance.directory, subscribe, {
ignore: [...FileIgnore.PATTERNS, ...cfgIgnores, ...Protected.paths()],
backend,
})
const sub = await withTimeout(pending, SUBSCRIBE_TIMEOUT_MS).catch((err) => {
log.error("failed to subscribe to Instance.directory", { error: err })
pending.then((s) => s.unsubscribe()).catch(() => {})
return undefined
})
if (sub) subs.push(sub)
}
if (Instance.project.vcs === "git") {
const result = await git(["rev-parse", "--git-dir"], {
cwd: Instance.worktree,
})
const vcsDir = result.exitCode === 0 ? path.resolve(Instance.worktree, result.text().trim()) : undefined
if (vcsDir && !cfgIgnores.includes(".git") && !cfgIgnores.includes(vcsDir)) {
const gitDirContents = await readdir(vcsDir).catch(() => [])
const ignoreList = gitDirContents.filter((entry) => entry !== "HEAD")
const pending = w.subscribe(vcsDir, subscribe, {
ignore: ignoreList,
backend,
})
const sub = await withTimeout(pending, SUBSCRIBE_TIMEOUT_MS).catch((err) => {
log.error("failed to subscribe to vcsDir", { error: err })
pending.then((s) => s.unsubscribe()).catch(() => {})
return undefined
})
if (sub) subs.push(sub)
}
}
return { subs }
},
async (state) => {
if (!state.subs) return
await Promise.all(state.subs.map((sub) => sub?.unsubscribe()))
},
)
export function init() {
if (Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER) {
return
}
state()
export namespace FileWatcherService {
export interface Service {
readonly init: () => Effect.Effect<void>
}
}
export class FileWatcherService extends ServiceMap.Service<FileWatcherService, FileWatcherService.Service>()(
"@opencode/FileWatcher",
) {
static readonly layer = Layer.effect(
FileWatcherService,
Effect.gen(function* () {
const instance = yield* InstanceContext
if (yield* Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER) return FileWatcherService.of({ init })
log.info("init", { directory: instance.directory })
const backend = getBackend()
if (!backend) {
log.error("watcher backend not supported", { directory: instance.directory, platform: process.platform })
return FileWatcherService.of({ init })
}
const w = watcher()
if (!w) return FileWatcherService.of({ init })
log.info("watcher backend", { directory: instance.directory, platform: process.platform, backend })
const subs: ParcelWatcher.AsyncSubscription[] = []
yield* Effect.addFinalizer(() => Effect.promise(() => Promise.allSettled(subs.map((sub) => sub.unsubscribe()))))
const cb: ParcelWatcher.SubscribeCallback = Instance.bind((err, evts) => {
if (err) return
for (const evt of evts) {
if (evt.type === "create") Bus.publish(event.Updated, { file: evt.path, event: "add" })
if (evt.type === "update") Bus.publish(event.Updated, { file: evt.path, event: "change" })
if (evt.type === "delete") Bus.publish(event.Updated, { file: evt.path, event: "unlink" })
}
})
const subscribe = (dir: string, ignore: string[]) => {
const pending = w.subscribe(dir, cb, { ignore, backend })
return Effect.gen(function* () {
const sub = yield* Effect.promise(() => pending)
subs.push(sub)
}).pipe(
Effect.timeout(SUBSCRIBE_TIMEOUT_MS),
Effect.catchCause((cause) => {
log.error("failed to subscribe", { dir, cause: Cause.pretty(cause) })
// Clean up a subscription that resolves after timeout
pending.then((s) => s.unsubscribe()).catch(() => {})
return Effect.void
}),
)
}
const cfg = yield* Effect.promise(() => Config.get())
const cfgIgnores = cfg.watcher?.ignore ?? []
if (yield* Flag.OPENCODE_EXPERIMENTAL_FILEWATCHER) {
yield* subscribe(instance.directory, [...FileIgnore.PATTERNS, ...cfgIgnores, ...Protected.paths()])
}
if (instance.project.vcs === "git") {
const result = yield* Effect.promise(() =>
git(["rev-parse", "--git-dir"], {
cwd: instance.project.worktree,
}),
)
const vcsDir = result.exitCode === 0 ? path.resolve(instance.project.worktree, result.text().trim()) : undefined
if (vcsDir && !cfgIgnores.includes(".git") && !cfgIgnores.includes(vcsDir)) {
const ignore = (yield* Effect.promise(() => readdir(vcsDir).catch(() => []))).filter(
(entry) => entry !== "HEAD",
)
yield* subscribe(vcsDir, ignore)
}
}
return FileWatcherService.of({ init })
}).pipe(
Effect.catchCause((cause) => {
log.error("failed to init watcher service", { cause: Cause.pretty(cause) })
return Effect.succeed(FileWatcherService.of({ init }))
}),
),
)
}

View File

@@ -1,3 +1,5 @@
import { Config } from "effect"
function truthy(key: string) {
const value = process.env[key]?.toLowerCase()
return value === "true" || value === "1"
@@ -40,8 +42,12 @@ export namespace Flag {
// Experimental
export const OPENCODE_EXPERIMENTAL = truthy("OPENCODE_EXPERIMENTAL")
export const OPENCODE_EXPERIMENTAL_FILEWATCHER = truthy("OPENCODE_EXPERIMENTAL_FILEWATCHER")
export const OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = truthy("OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER")
export const OPENCODE_EXPERIMENTAL_FILEWATCHER = Config.boolean("OPENCODE_EXPERIMENTAL_FILEWATCHER").pipe(
Config.withDefault(false),
)
export const OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = Config.boolean(
"OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER",
).pipe(Config.withDefault(false))
export const OPENCODE_EXPERIMENTAL_ICON_DISCOVERY =
OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_ICON_DISCOVERY")
@@ -55,7 +61,9 @@ export namespace Flag {
export const OPENCODE_EXPERIMENTAL_OXFMT = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_OXFMT")
export const OPENCODE_EXPERIMENTAL_LSP_TY = truthy("OPENCODE_EXPERIMENTAL_LSP_TY")
export const OPENCODE_EXPERIMENTAL_LSP_TOOL = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_LSP_TOOL")
export const OPENCODE_DISABLE_FILETIME_CHECK = truthy("OPENCODE_DISABLE_FILETIME_CHECK")
export const OPENCODE_DISABLE_FILETIME_CHECK = Config.boolean("OPENCODE_DISABLE_FILETIME_CHECK").pipe(
Config.withDefault(false),
)
export const OPENCODE_EXPERIMENTAL_PLAN_MODE = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_PLAN_MODE")
export const OPENCODE_EXPERIMENTAL_WORKSPACES = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES")
export const OPENCODE_EXPERIMENTAL_MARKDOWN = !falsy("OPENCODE_EXPERIMENTAL_MARKDOWN")

View File

@@ -1,6 +1,6 @@
import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { InstanceContext } from "@/effect/instances"
import { InstanceContext } from "@/effect/instance-context"
import { ProjectID } from "@/project/schema"
import { MessageID, SessionID } from "@/session/schema"
import { PermissionTable } from "@/session/session.sql"

View File

@@ -1,17 +1,18 @@
import { Plugin } from "../plugin"
import { Format } from "../format"
import { LSP } from "../lsp"
import { FileWatcher } from "../file/watcher"
import { FileWatcherService } from "../file/watcher"
import { File } from "../file"
import { Project } from "./project"
import { Bus } from "../bus"
import { Command } from "../command"
import { Instance } from "./instance"
import { Vcs } from "./vcs"
import { VcsService } from "./vcs"
import { Log } from "@/util/log"
import { ShareNext } from "@/share/share-next"
import { Snapshot } from "../snapshot"
import { Truncate } from "../tool/truncation"
import { runPromiseInstance } from "@/effect/runtime"
export async function InstanceBootstrap() {
Log.Default.info("bootstrapping", { directory: Instance.directory })
@@ -19,9 +20,9 @@ export async function InstanceBootstrap() {
ShareNext.init()
Format.init()
await LSP.init()
FileWatcher.init()
await runPromiseInstance(FileWatcherService.use((service) => service.init()))
File.init()
Vcs.init()
await runPromiseInstance(VcsService.use((s) => s.init()))
Snapshot.init()
Truncate.init()

View File

@@ -101,6 +101,15 @@ export const Instance = {
if (Instance.worktree === "/") return false
return Filesystem.contains(Instance.worktree, filepath)
},
/**
* Captures the current instance ALS context and returns a wrapper that
* restores it when called. Use this for callbacks that fire outside the
* instance async context (native addons, event emitters, timers, etc.).
*/
bind<F extends (...args: any[]) => any>(fn: F): F {
const ctx = context.use()
return ((...args: any[]) => context.provide(ctx, () => fn(...args))) as F
},
state<S>(init: () => S, dispose?: (state: Awaited<S>) => Promise<void>): () => S {
return State.create(() => Instance.directory, init, dispose)
},

View File

@@ -1,11 +1,12 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import path from "path"
import z from "zod"
import { Log } from "@/util/log"
import { Instance } from "./instance"
import { InstanceContext } from "@/effect/instance-context"
import { FileWatcher } from "@/file/watcher"
import { git } from "@/util/git"
import { Effect, Layer, ServiceMap } from "effect"
const log = Log.create({ service: "vcs" })
@@ -27,50 +28,57 @@ export namespace Vcs {
ref: "VcsInfo",
})
export type Info = z.infer<typeof Info>
}
async function currentBranch() {
const result = await git(["rev-parse", "--abbrev-ref", "HEAD"], {
cwd: Instance.worktree,
})
if (result.exitCode !== 0) return
const text = result.text().trim()
if (!text) return
return text
}
const state = Instance.state(
async () => {
if (Instance.project.vcs !== "git") {
return { branch: async () => undefined, unsubscribe: undefined }
}
let current = await currentBranch()
log.info("initialized", { branch: current })
const unsubscribe = Bus.subscribe(FileWatcher.Event.Updated, async (evt) => {
if (evt.properties.file.endsWith("HEAD")) return
const next = await currentBranch()
if (next !== current) {
log.info("branch changed", { from: current, to: next })
current = next
Bus.publish(Event.BranchUpdated, { branch: next })
}
})
return {
branch: async () => current,
unsubscribe,
}
},
async (state) => {
state.unsubscribe?.()
},
)
export async function init() {
return state()
}
export async function branch() {
return await state().then((s) => s.branch())
export namespace VcsService {
export interface Service {
readonly init: () => Effect.Effect<void>
readonly branch: () => Effect.Effect<string | undefined>
}
}
export class VcsService extends ServiceMap.Service<VcsService, VcsService.Service>()("@opencode/Vcs") {
static readonly layer = Layer.effect(
VcsService,
Effect.gen(function* () {
const instance = yield* InstanceContext
let current: string | undefined
if (instance.project.vcs === "git") {
const currentBranch = async () => {
const result = await git(["rev-parse", "--abbrev-ref", "HEAD"], {
cwd: instance.project.worktree,
})
if (result.exitCode !== 0) return undefined
const text = result.text().trim()
return text || undefined
}
current = yield* Effect.promise(() => currentBranch())
log.info("initialized", { branch: current })
const unsubscribe = Bus.subscribe(
FileWatcher.Event.Updated,
Instance.bind(async (evt) => {
if (!evt.properties.file.endsWith("HEAD")) return
const next = await currentBranch()
if (next !== current) {
log.info("branch changed", { from: current, to: next })
current = next
Bus.publish(Vcs.Event.BranchUpdated, { branch: next })
}
}),
)
yield* Effect.addFinalizer(() => Effect.sync(unsubscribe))
}
return VcsService.of({
init: Effect.fn("VcsService.init")(function* () {}),
branch: Effect.fn("VcsService.branch")(function* () {
return current
}),
})
}),
)
}

View File

@@ -167,40 +167,44 @@ export namespace Pty {
subscribers: new Map(),
}
state().set(id, session)
ptyProcess.onData((chunk) => {
session.cursor += chunk.length
ptyProcess.onData(
Instance.bind((chunk) => {
session.cursor += chunk.length
for (const [key, ws] of session.subscribers.entries()) {
if (ws.readyState !== 1) {
session.subscribers.delete(key)
continue
for (const [key, ws] of session.subscribers.entries()) {
if (ws.readyState !== 1) {
session.subscribers.delete(key)
continue
}
if (ws.data !== key) {
session.subscribers.delete(key)
continue
}
try {
ws.send(chunk)
} catch {
session.subscribers.delete(key)
}
}
if (ws.data !== key) {
session.subscribers.delete(key)
continue
}
try {
ws.send(chunk)
} catch {
session.subscribers.delete(key)
}
}
session.buffer += chunk
if (session.buffer.length <= BUFFER_LIMIT) return
const excess = session.buffer.length - BUFFER_LIMIT
session.buffer = session.buffer.slice(excess)
session.bufferCursor += excess
})
ptyProcess.onExit(({ exitCode }) => {
if (session.info.status === "exited") return
log.info("session exited", { id, exitCode })
session.info.status = "exited"
Bus.publish(Event.Exited, { id, exitCode })
remove(id)
})
session.buffer += chunk
if (session.buffer.length <= BUFFER_LIMIT) return
const excess = session.buffer.length - BUFFER_LIMIT
session.buffer = session.buffer.slice(excess)
session.bufferCursor += excess
}),
)
ptyProcess.onExit(
Instance.bind(({ exitCode }) => {
if (session.info.status === "exited") return
log.info("session exited", { id, exitCode })
session.info.status = "exited"
Bus.publish(Event.Exited, { id, exitCode })
remove(id)
}),
)
Bus.publish(Event.Created, { info })
return info
}

View File

@@ -14,7 +14,8 @@ import { LSP } from "../lsp"
import { Format } from "../format"
import { TuiRoutes } from "./routes/tui"
import { Instance } from "../project/instance"
import { Vcs } from "../project/vcs"
import { Vcs, VcsService } from "../project/vcs"
import { runPromiseInstance } from "@/effect/runtime"
import { Agent } from "../agent/agent"
import { Skill } from "../skill/skill"
import { Auth } from "../auth"
@@ -330,7 +331,7 @@ export namespace Server {
},
}),
async (c) => {
const branch = await Vcs.branch()
const branch = await runPromiseInstance(VcsService.use((s) => s.branch()))
return c.json({
branch,
})

View File

@@ -1,361 +1,126 @@
import { describe, test, expect, beforeEach } from "bun:test"
import path from "path"
import { afterEach, expect, test } from "bun:test"
import fs from "fs/promises"
import { FileTime } from "../../src/file/time"
import { Instance } from "../../src/project/instance"
import { Filesystem } from "../../src/util/filesystem"
import path from "path"
import { tmpdir } from "../fixture/fixture"
import { Instance } from "../../src/project/instance"
import { FileTime } from "../../src/file/time"
describe("file/time", () => {
const sessionID = "test-session-123"
afterEach(() => Instance.disposeAll())
describe("read() and get()", () => {
test("stores read timestamp", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await fs.writeFile(filepath, "content", "utf-8")
test("read records a timestamp for a file", async () => {
await using tmp = await tmpdir()
const file = path.join(tmp.path, "test.txt")
await fs.writeFile(file, "hello")
await Instance.provide({
directory: tmp.path,
fn: async () => {
const before = FileTime.get(sessionID, filepath)
expect(before).toBeUndefined()
FileTime.read(sessionID, filepath)
const after = FileTime.get(sessionID, filepath)
expect(after).toBeInstanceOf(Date)
expect(after!.getTime()).toBeGreaterThan(0)
},
})
})
test("tracks separate timestamps per session", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await fs.writeFile(filepath, "content", "utf-8")
await Instance.provide({
directory: tmp.path,
fn: async () => {
FileTime.read("session1", filepath)
FileTime.read("session2", filepath)
const time1 = FileTime.get("session1", filepath)
const time2 = FileTime.get("session2", filepath)
expect(time1).toBeDefined()
expect(time2).toBeDefined()
},
})
})
test("updates timestamp on subsequent reads", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await fs.writeFile(filepath, "content", "utf-8")
await Instance.provide({
directory: tmp.path,
fn: async () => {
FileTime.read(sessionID, filepath)
const first = FileTime.get(sessionID, filepath)!
await new Promise((resolve) => setTimeout(resolve, 10))
FileTime.read(sessionID, filepath)
const second = FileTime.get(sessionID, filepath)!
expect(second.getTime()).toBeGreaterThanOrEqual(first.getTime())
},
})
})
})
describe("assert()", () => {
test("passes when file has not been modified", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await fs.writeFile(filepath, "content", "utf-8")
await Instance.provide({
directory: tmp.path,
fn: async () => {
FileTime.read(sessionID, filepath)
// Should not throw
await FileTime.assert(sessionID, filepath)
},
})
})
test("throws when file was not read first", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await fs.writeFile(filepath, "content", "utf-8")
await Instance.provide({
directory: tmp.path,
fn: async () => {
await expect(FileTime.assert(sessionID, filepath)).rejects.toThrow("You must read file")
},
})
})
test("throws when file was modified after read", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await fs.writeFile(filepath, "content", "utf-8")
await Instance.provide({
directory: tmp.path,
fn: async () => {
FileTime.read(sessionID, filepath)
// Wait to ensure different timestamps
await new Promise((resolve) => setTimeout(resolve, 100))
// Modify file after reading
await fs.writeFile(filepath, "modified content", "utf-8")
await expect(FileTime.assert(sessionID, filepath)).rejects.toThrow("modified since it was last read")
},
})
})
test("includes timestamps in error message", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await fs.writeFile(filepath, "content", "utf-8")
await Instance.provide({
directory: tmp.path,
fn: async () => {
FileTime.read(sessionID, filepath)
await new Promise((resolve) => setTimeout(resolve, 100))
await fs.writeFile(filepath, "modified", "utf-8")
let error: Error | undefined
try {
await FileTime.assert(sessionID, filepath)
} catch (e) {
error = e as Error
}
expect(error).toBeDefined()
expect(error!.message).toContain("Last modification:")
expect(error!.message).toContain("Last read:")
},
})
})
test("skips check when OPENCODE_DISABLE_FILETIME_CHECK is true", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await fs.writeFile(filepath, "content", "utf-8")
await Instance.provide({
directory: tmp.path,
fn: async () => {
const { Flag } = await import("../../src/flag/flag")
const original = Flag.OPENCODE_DISABLE_FILETIME_CHECK
;(Flag as { OPENCODE_DISABLE_FILETIME_CHECK: boolean }).OPENCODE_DISABLE_FILETIME_CHECK = true
try {
// Should not throw even though file wasn't read
await FileTime.assert(sessionID, filepath)
} finally {
;(Flag as { OPENCODE_DISABLE_FILETIME_CHECK: boolean }).OPENCODE_DISABLE_FILETIME_CHECK = original
}
},
})
})
})
describe("withLock()", () => {
test("executes function within lock", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await Instance.provide({
directory: tmp.path,
fn: async () => {
let executed = false
await FileTime.withLock(filepath, async () => {
executed = true
return "result"
})
expect(executed).toBe(true)
},
})
})
test("returns function result", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await Instance.provide({
directory: tmp.path,
fn: async () => {
const result = await FileTime.withLock(filepath, async () => {
return "success"
})
expect(result).toBe("success")
},
})
})
test("serializes concurrent operations on same file", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await Instance.provide({
directory: tmp.path,
fn: async () => {
const order: number[] = []
const op1 = FileTime.withLock(filepath, async () => {
order.push(1)
await new Promise((resolve) => setTimeout(resolve, 10))
order.push(2)
})
const op2 = FileTime.withLock(filepath, async () => {
order.push(3)
order.push(4)
})
await Promise.all([op1, op2])
// Operations should be serialized
expect(order).toContain(1)
expect(order).toContain(2)
expect(order).toContain(3)
expect(order).toContain(4)
},
})
})
test("allows concurrent operations on different files", async () => {
await using tmp = await tmpdir()
const filepath1 = path.join(tmp.path, "file1.txt")
const filepath2 = path.join(tmp.path, "file2.txt")
await Instance.provide({
directory: tmp.path,
fn: async () => {
let started1 = false
let started2 = false
const op1 = FileTime.withLock(filepath1, async () => {
started1 = true
await new Promise((resolve) => setTimeout(resolve, 50))
expect(started2).toBe(true) // op2 should have started while op1 is running
})
const op2 = FileTime.withLock(filepath2, async () => {
started2 = true
})
await Promise.all([op1, op2])
expect(started1).toBe(true)
expect(started2).toBe(true)
},
})
})
test("releases lock even if function throws", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await Instance.provide({
directory: tmp.path,
fn: async () => {
await expect(
FileTime.withLock(filepath, async () => {
throw new Error("Test error")
}),
).rejects.toThrow("Test error")
// Lock should be released, subsequent operations should work
let executed = false
await FileTime.withLock(filepath, async () => {
executed = true
})
expect(executed).toBe(true)
},
})
})
test("deadlocks on nested locks (expected behavior)", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await Instance.provide({
directory: tmp.path,
fn: async () => {
// Nested locks on same file cause deadlock - this is expected
// The outer lock waits for inner to complete, but inner waits for outer to release
const timeout = new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error("Deadlock detected")), 100),
)
const nestedLock = FileTime.withLock(filepath, async () => {
return FileTime.withLock(filepath, async () => {
return "inner"
})
})
// Should timeout due to deadlock
await expect(Promise.race([nestedLock, timeout])).rejects.toThrow("Deadlock detected")
},
})
})
})
describe("stat() Filesystem.stat pattern", () => {
test("reads file modification time via Filesystem.stat()", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await fs.writeFile(filepath, "content", "utf-8")
await Instance.provide({
directory: tmp.path,
fn: async () => {
FileTime.read(sessionID, filepath)
const stats = Filesystem.stat(filepath)
expect(stats?.mtime).toBeInstanceOf(Date)
expect(stats!.mtime.getTime()).toBeGreaterThan(0)
// FileTime.assert uses this stat internally
await FileTime.assert(sessionID, filepath)
},
})
})
test("detects modification via stat mtime", async () => {
await using tmp = await tmpdir()
const filepath = path.join(tmp.path, "file.txt")
await fs.writeFile(filepath, "original", "utf-8")
await Instance.provide({
directory: tmp.path,
fn: async () => {
FileTime.read(sessionID, filepath)
const originalStat = Filesystem.stat(filepath)
// Wait and modify
await new Promise((resolve) => setTimeout(resolve, 100))
await fs.writeFile(filepath, "modified", "utf-8")
const newStat = Filesystem.stat(filepath)
expect(newStat!.mtime.getTime()).toBeGreaterThan(originalStat!.mtime.getTime())
await expect(FileTime.assert(sessionID, filepath)).rejects.toThrow()
},
})
})
await Instance.provide({
directory: tmp.path,
fn: async () => {
const before = new Date()
FileTime.read("session-1", file)
// read is fire-and-forget async — wait a tick for it to complete
await Bun.sleep(10)
const time = await FileTime.get("session-1", file)
expect(time).toBeDefined()
expect(time!.getTime()).toBeGreaterThanOrEqual(before.getTime())
},
})
})
test("get returns undefined for unread files", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
const time = await FileTime.get("session-1", "/nonexistent")
expect(time).toBeUndefined()
},
})
})
test("read times are scoped per session", async () => {
await using tmp = await tmpdir()
const file = path.join(tmp.path, "test.txt")
await fs.writeFile(file, "hello")
await Instance.provide({
directory: tmp.path,
fn: async () => {
FileTime.read("session-a", file)
await Bun.sleep(10)
expect(await FileTime.get("session-a", file)).toBeDefined()
expect(await FileTime.get("session-b", file)).toBeUndefined()
},
})
})
test("assert throws if file was not read first", async () => {
await using tmp = await tmpdir()
const file = path.join(tmp.path, "test.txt")
await fs.writeFile(file, "hello")
await Instance.provide({
directory: tmp.path,
fn: async () => {
await expect(FileTime.assert("session-1", file)).rejects.toThrow("must read file")
},
})
})
test("assert passes if file unchanged since read", async () => {
await using tmp = await tmpdir()
const file = path.join(tmp.path, "test.txt")
await fs.writeFile(file, "hello")
await Instance.provide({
directory: tmp.path,
fn: async () => {
FileTime.read("session-1", file)
await Bun.sleep(10)
await FileTime.assert("session-1", file)
},
})
})
test("assert throws if file modified after read", async () => {
await using tmp = await tmpdir()
const file = path.join(tmp.path, "test.txt")
await fs.writeFile(file, "hello")
await Instance.provide({
directory: tmp.path,
fn: async () => {
FileTime.read("session-1", file)
await Bun.sleep(100)
await fs.writeFile(file, "modified")
await expect(FileTime.assert("session-1", file)).rejects.toThrow("has been modified")
},
})
})
test("withLock serializes concurrent writes to same file", async () => {
await using tmp = await tmpdir()
const file = path.join(tmp.path, "locked.txt")
await fs.writeFile(file, "")
await Instance.provide({
directory: tmp.path,
fn: async () => {
const order: number[] = []
const a = FileTime.withLock(file, async () => {
order.push(1)
await Bun.sleep(50)
order.push(2)
})
const b = FileTime.withLock(file, async () => {
order.push(3)
await Bun.sleep(10)
order.push(4)
})
await Promise.all([a, b])
expect(order).toEqual([1, 2, 3, 4])
},
})
})

View File

@@ -0,0 +1,236 @@
import { $ } from "bun"
import { afterEach, describe, expect, test } from "bun:test"
import fs from "fs/promises"
import path from "path"
import { Deferred, Effect, Fiber, Option } from "effect"
import { tmpdir } from "../fixture/fixture"
import { watcherConfigLayer, withServices } from "../fixture/instance"
import { FileWatcher, FileWatcherService } from "../../src/file/watcher"
import { Instance } from "../../src/project/instance"
import { GlobalBus } from "../../src/bus/global"
// Native @parcel/watcher bindings aren't reliably available in CI (missing on Linux, flaky on Windows)
const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
type BusUpdate = { directory?: string; payload: { type: string; properties: WatcherEvent } }
type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
/** Run `body` with a live FileWatcherService. */
function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
return withServices(
directory,
FileWatcherService.layer,
async (rt) => {
await rt.runPromise(FileWatcherService.use((s) => s.init()))
await Effect.runPromise(ready(directory))
await Effect.runPromise(body)
},
{ provide: [watcherConfigLayer] },
)
}
function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {
let done = false
function on(evt: BusUpdate) {
if (done) return
if (evt.directory !== directory) return
if (evt.payload.type !== FileWatcher.Event.Updated.type) return
if (!check(evt.payload.properties)) return
hit(evt.payload.properties)
}
function cleanup() {
if (done) return
done = true
GlobalBus.off("event", on)
}
GlobalBus.on("event", on)
return cleanup
}
function wait(directory: string, check: (evt: WatcherEvent) => boolean) {
return Effect.callback<WatcherEvent>((resume) => {
const cleanup = listen(directory, check, (evt) => {
cleanup()
resume(Effect.succeed(evt))
})
return Effect.sync(cleanup)
}).pipe(Effect.timeout("5 seconds"))
}
function nextUpdate<E>(directory: string, check: (evt: WatcherEvent) => boolean, trigger: Effect.Effect<void, E>) {
return Effect.acquireUseRelease(
wait(directory, check).pipe(Effect.forkChild({ startImmediately: true })),
(fiber) =>
Effect.gen(function* () {
yield* trigger
return yield* Fiber.join(fiber)
}),
Fiber.interrupt,
)
}
/** Effect that asserts no matching event arrives within `ms`. */
function noUpdate<E>(
directory: string,
check: (evt: WatcherEvent) => boolean,
trigger: Effect.Effect<void, E>,
ms = 500,
) {
return Effect.gen(function* () {
const deferred = yield* Deferred.make<WatcherEvent>()
yield* Effect.acquireUseRelease(
Effect.sync(() =>
listen(directory, check, (evt) => {
Effect.runSync(Deferred.succeed(deferred, evt))
}),
),
() =>
Effect.gen(function* () {
yield* trigger
expect(yield* Deferred.await(deferred).pipe(Effect.timeoutOption(`${ms} millis`))).toEqual(Option.none())
}),
(cleanup) => Effect.sync(cleanup),
)
})
}
function ready(directory: string) {
const file = path.join(directory, `.watcher-${Math.random().toString(36).slice(2)}`)
const head = path.join(directory, ".git", "HEAD")
return Effect.gen(function* () {
yield* nextUpdate(
directory,
(evt) => evt.file === file && evt.event === "add",
Effect.promise(() => fs.writeFile(file, "ready")),
).pipe(Effect.ensuring(Effect.promise(() => fs.rm(file, { force: true }).catch(() => undefined))), Effect.asVoid)
const git = yield* Effect.promise(() =>
fs
.stat(head)
.then(() => true)
.catch(() => false),
)
if (!git) return
const branch = `watch-${Math.random().toString(36).slice(2)}`
const hash = yield* Effect.promise(() => $`git rev-parse HEAD`.cwd(directory).quiet().text())
yield* nextUpdate(
directory,
(evt) => evt.file === head && evt.event !== "unlink",
Effect.promise(async () => {
await fs.writeFile(path.join(directory, ".git", "refs", "heads", branch), hash.trim() + "\n")
await fs.writeFile(head, `ref: refs/heads/${branch}\n`)
}),
).pipe(Effect.asVoid)
})
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describeWatcher("FileWatcherService", () => {
afterEach(() => Instance.disposeAll())
test("publishes root create, update, and delete events", async () => {
await using tmp = await tmpdir({ git: true })
const file = path.join(tmp.path, "watch.txt")
const dir = tmp.path
const cases = [
{ event: "add" as const, trigger: Effect.promise(() => fs.writeFile(file, "a")) },
{ event: "change" as const, trigger: Effect.promise(() => fs.writeFile(file, "b")) },
{ event: "unlink" as const, trigger: Effect.promise(() => fs.unlink(file)) },
]
await withWatcher(
dir,
Effect.forEach(cases, ({ event, trigger }) =>
nextUpdate(dir, (evt) => evt.file === file && evt.event === event, trigger).pipe(
Effect.tap((evt) => Effect.sync(() => expect(evt).toEqual({ file, event }))),
),
),
)
})
test("watches non-git roots", async () => {
await using tmp = await tmpdir()
const file = path.join(tmp.path, "plain.txt")
const dir = tmp.path
await withWatcher(
dir,
nextUpdate(
dir,
(e) => e.file === file && e.event === "add",
Effect.promise(() => fs.writeFile(file, "plain")),
).pipe(Effect.tap((evt) => Effect.sync(() => expect(evt).toEqual({ file, event: "add" })))),
)
})
test("cleanup stops publishing events", async () => {
await using tmp = await tmpdir({ git: true })
const file = path.join(tmp.path, "after-dispose.txt")
// Start and immediately stop the watcher (withWatcher disposes on exit)
await withWatcher(tmp.path, Effect.void)
// Now write a file — no watcher should be listening
await Effect.runPromise(
noUpdate(
tmp.path,
(e) => e.file === file,
Effect.promise(() => fs.writeFile(file, "gone")),
),
)
})
test("ignores .git/index changes", async () => {
await using tmp = await tmpdir({ git: true })
const gitIndex = path.join(tmp.path, ".git", "index")
const edit = path.join(tmp.path, "tracked.txt")
await withWatcher(
tmp.path,
noUpdate(
tmp.path,
(e) => e.file === gitIndex,
Effect.promise(async () => {
await fs.writeFile(edit, "a")
await $`git add .`.cwd(tmp.path).quiet().nothrow()
}),
),
)
})
test("publishes .git/HEAD events", async () => {
await using tmp = await tmpdir({ git: true })
const head = path.join(tmp.path, ".git", "HEAD")
const branch = `watch-${Math.random().toString(36).slice(2)}`
await $`git branch ${branch}`.cwd(tmp.path).quiet()
await withWatcher(
tmp.path,
nextUpdate(
tmp.path,
(evt) => evt.file === head && evt.event !== "unlink",
Effect.promise(() => fs.writeFile(head, `ref: refs/heads/${branch}\n`)),
).pipe(
Effect.tap((evt) =>
Effect.sync(() => {
expect(evt.file).toBe(head)
expect(["add", "change"]).toContain(evt.event)
}),
),
),
)
})
})

View File

@@ -0,0 +1,47 @@
import { ConfigProvider, Layer, ManagedRuntime } from "effect"
import { InstanceContext } from "../../src/effect/instance-context"
import { Instance } from "../../src/project/instance"
/** ConfigProvider that enables the experimental file watcher. */
export const watcherConfigLayer = ConfigProvider.layer(
ConfigProvider.fromUnknown({
OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
}),
)
/**
* Boot an Instance with the given service layers and run `body` with
* the ManagedRuntime. Cleanup is automatic — the runtime is disposed
* and Instance context is torn down when `body` completes.
*
* Layers may depend on InstanceContext (provided automatically).
* Pass extra layers via `options.provide` (e.g. ConfigProvider.layer).
*/
export function withServices<S>(
directory: string,
layer: Layer.Layer<S, any, InstanceContext>,
body: (rt: ManagedRuntime.ManagedRuntime<S, never>) => Promise<void>,
options?: { provide?: Layer.Layer<never>[] },
) {
return Instance.provide({
directory,
fn: async () => {
const ctx = Layer.sync(InstanceContext, () =>
InstanceContext.of({ directory: Instance.directory, project: Instance.project }),
)
let resolved: Layer.Layer<S> = Layer.fresh(layer).pipe(Layer.provide(ctx)) as any
if (options?.provide) {
for (const l of options.provide) {
resolved = resolved.pipe(Layer.provide(l)) as any
}
}
const rt = ManagedRuntime.make(resolved)
try {
await body(rt)
} finally {
await rt.dispose()
}
},
})
}

View File

@@ -977,7 +977,7 @@ test("ask - should deny even when an earlier pattern is ask", async () => {
await Instance.provide({
directory: tmp.path,
fn: async () => {
const ask = PermissionNext.ask({
const err = await PermissionNext.ask({
sessionID: SessionID.make("session_test"),
permission: "bash",
patterns: ["echo hello", "rm -rf /"],
@@ -987,24 +987,12 @@ test("ask - should deny even when an earlier pattern is ask", async () => {
{ permission: "bash", pattern: "echo *", action: "ask" },
{ permission: "bash", pattern: "rm *", action: "deny" },
],
})
}).then(
() => undefined,
(err) => err,
)
const out = await Promise.race([
ask.then(
() => ({ ok: true as const, err: undefined }),
(err) => ({ ok: false as const, err }),
),
Bun.sleep(100).then(() => "timeout" as const),
])
if (out === "timeout") {
await rejectAll()
await ask.catch(() => {})
throw new Error("ask timed out instead of denying immediately")
}
expect(out.ok).toBe(false)
expect(out.err).toBeInstanceOf(PermissionNext.DeniedError)
expect(err).toBeInstanceOf(PermissionNext.DeniedError)
expect(await PermissionNext.list()).toHaveLength(0)
},
})

View File

@@ -0,0 +1,117 @@
import { $ } from "bun"
import { afterEach, describe, expect, test } from "bun:test"
import fs from "fs/promises"
import path from "path"
import { Layer, ManagedRuntime } from "effect"
import { tmpdir } from "../fixture/fixture"
import { watcherConfigLayer, withServices } from "../fixture/instance"
import { FileWatcher, FileWatcherService } from "../../src/file/watcher"
import { Instance } from "../../src/project/instance"
import { GlobalBus } from "../../src/bus/global"
import { Vcs, VcsService } from "../../src/project/vcs"
// Skip in CI — native @parcel/watcher binding needed
const describeVcs = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function withVcs(
directory: string,
body: (rt: ManagedRuntime.ManagedRuntime<FileWatcherService | VcsService, never>) => Promise<void>,
) {
return withServices(
directory,
Layer.merge(FileWatcherService.layer, VcsService.layer),
async (rt) => {
await rt.runPromise(FileWatcherService.use((s) => s.init()))
await rt.runPromise(VcsService.use((s) => s.init()))
await Bun.sleep(200)
await body(rt)
},
{ provide: [watcherConfigLayer] },
)
}
type BranchEvent = { directory?: string; payload: { type: string; properties: { branch?: string } } }
/** Wait for a Vcs.Event.BranchUpdated event on GlobalBus */
function nextBranchUpdate(directory: string, timeout = 5000) {
return new Promise<string | undefined>((resolve, reject) => {
const timer = setTimeout(() => {
GlobalBus.off("event", on)
reject(new Error("timed out waiting for BranchUpdated event"))
}, timeout)
function on(evt: BranchEvent) {
if (evt.directory !== directory) return
if (evt.payload.type !== Vcs.Event.BranchUpdated.type) return
clearTimeout(timer)
GlobalBus.off("event", on)
resolve(evt.payload.properties.branch)
}
GlobalBus.on("event", on)
})
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describeVcs("Vcs", () => {
afterEach(() => Instance.disposeAll())
test("branch() returns current branch name", async () => {
await using tmp = await tmpdir({ git: true })
await withVcs(tmp.path, async (rt) => {
const branch = await rt.runPromise(VcsService.use((s) => s.branch()))
expect(branch).toBeDefined()
expect(typeof branch).toBe("string")
})
})
test("branch() returns undefined for non-git directories", async () => {
await using tmp = await tmpdir()
await withVcs(tmp.path, async (rt) => {
const branch = await rt.runPromise(VcsService.use((s) => s.branch()))
expect(branch).toBeUndefined()
})
})
test("publishes BranchUpdated when .git/HEAD changes", async () => {
await using tmp = await tmpdir({ git: true })
const branch = `test-${Math.random().toString(36).slice(2)}`
await $`git branch ${branch}`.cwd(tmp.path).quiet()
await withVcs(tmp.path, async () => {
const pending = nextBranchUpdate(tmp.path)
const head = path.join(tmp.path, ".git", "HEAD")
await fs.writeFile(head, `ref: refs/heads/${branch}\n`)
const updated = await pending
expect(updated).toBe(branch)
})
})
test("branch() reflects the new branch after HEAD change", async () => {
await using tmp = await tmpdir({ git: true })
const branch = `test-${Math.random().toString(36).slice(2)}`
await $`git branch ${branch}`.cwd(tmp.path).quiet()
await withVcs(tmp.path, async (rt) => {
const pending = nextBranchUpdate(tmp.path)
const head = path.join(tmp.path, ".git", "HEAD")
await fs.writeFile(head, `ref: refs/heads/${branch}\n`)
await pending
const current = await rt.runPromise(VcsService.use((s) => s.branch()))
expect(current).toBe(branch)
})
})
})

View File

@@ -6,7 +6,7 @@ import type { PtyID } from "../../src/pty/schema"
import { tmpdir } from "../fixture/fixture"
import { setTimeout as sleep } from "node:timers/promises"
const wait = async (fn: () => boolean, ms = 2000) => {
const wait = async (fn: () => boolean, ms = 5000) => {
const end = Date.now() + ms
while (Date.now() < end) {
if (fn()) return
@@ -20,7 +20,7 @@ const pick = (log: Array<{ type: "created" | "exited" | "deleted"; id: PtyID }>,
}
describe("pty", () => {
test("publishes created, exited, deleted in order for /bin/ls + remove", async () => {
test("publishes created, exited, deleted in order for a short-lived process", async () => {
if (process.platform === "win32") return
await using dir = await tmpdir({ git: true })
@@ -37,7 +37,11 @@ describe("pty", () => {
let id: PtyID | undefined
try {
const info = await Pty.create({ command: "/bin/ls", title: "ls" })
const info = await Pty.create({
command: "/usr/bin/env",
args: ["sh", "-c", "sleep 0.1"],
title: "sleep",
})
id = info.id
await wait(() => pick(log, id!).includes("exited"))