mirror of
https://github.com/anomalyco/opencode.git
synced 2026-03-15 19:24:20 +00:00
Compare commits
17 Commits
dev
...
effectify-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3701344cce | ||
|
|
1d6d525c0f | ||
|
|
701d6c4374 | ||
|
|
f5c81a1343 | ||
|
|
247bd3d0db | ||
|
|
0f4df1a1b3 | ||
|
|
59cf7e5848 | ||
|
|
330f315d20 | ||
|
|
3e532b2ac8 | ||
|
|
cea3f2c924 | ||
|
|
cb5372c4e8 | ||
|
|
4b92f9c55e | ||
|
|
2cbd2a5a2d | ||
|
|
c35dc2245a | ||
|
|
ac4a807e6f | ||
|
|
219c7f728a | ||
|
|
2d088ab108 |
@@ -34,6 +34,7 @@ Instructions to follow when writing Effect.
|
||||
- Use `Effect.gen(function* () { ... })` for composition.
|
||||
- Use `Effect.fn("ServiceName.method")` for named/traced effects and `Effect.fnUntraced` for internal helpers.
|
||||
- `Effect.fn` / `Effect.fnUntraced` accept pipeable operators as extra arguments, so avoid unnecessary `flow` or outer `.pipe()` wrappers.
|
||||
- **`Effect.callback`** (not `Effect.async`) for callback-based APIs. The classic `Effect.async` was renamed to `Effect.callback` in effect-smol/v4.
|
||||
|
||||
## Time
|
||||
|
||||
@@ -42,3 +43,37 @@ Instructions to follow when writing Effect.
|
||||
## Errors
|
||||
|
||||
- In `Effect.gen/fn`, prefer `yield* new MyError(...)` over `yield* Effect.fail(new MyError(...))` for direct early-failure branches.
|
||||
|
||||
## Instance-scoped Effect services
|
||||
|
||||
Services that need per-directory lifecycle (created/destroyed per instance) go through the `Instances` LayerMap:
|
||||
|
||||
1. Define a `ServiceMap.Service` with a `static readonly layer` (see `FileWatcherService`, `QuestionService`, `PermissionService`, `ProviderAuthService`).
|
||||
2. Add it to `InstanceServices` union and `Layer.mergeAll(...)` in `src/effect/instances.ts`.
|
||||
3. Use `InstanceContext` inside the layer to read `directory` and `project` instead of `Instance.*` globals.
|
||||
4. Call from legacy code via `runPromiseInstance(MyService.use((s) => s.method()))`.
|
||||
|
||||
### Instance.bind — ALS context for native callbacks
|
||||
|
||||
`Instance.bind(fn)` captures the current Instance AsyncLocalStorage context and returns a wrapper that restores it synchronously when called.
|
||||
|
||||
**Use it** when passing callbacks to native C/C++ addons (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish`, `Instance.state()`, or anything that reads `Instance.directory`.
|
||||
|
||||
**Don't need it** for `setTimeout`, `Promise.then`, `EventEmitter.on`, or Effect fibers — Node.js ALS propagates through those automatically.
|
||||
|
||||
```typescript
|
||||
// Native addon callback — needs Instance.bind
|
||||
const cb = Instance.bind((err, evts) => {
|
||||
Bus.publish(MyEvent, { ... })
|
||||
})
|
||||
nativeAddon.subscribe(dir, cb)
|
||||
```
|
||||
|
||||
## Flag → Effect.Config migration
|
||||
|
||||
Flags in `src/flag/flag.ts` are being migrated from static `truthy(...)` reads to `Config.boolean(...).pipe(Config.withDefault(false))` as their consumers get effectified.
|
||||
|
||||
- Effectful flags return `Config<boolean>` and are read with `yield*` inside `Effect.gen`.
|
||||
- The default `ConfigProvider` reads from `process.env`, so env vars keep working.
|
||||
- Tests can override via `ConfigProvider.layer(ConfigProvider.fromUnknown({ ... }))`.
|
||||
- Keep all flags in `flag.ts` as the single registry — just change the implementation from `truthy()` to `Config.boolean()` when the consumer moves to Effect.
|
||||
|
||||
12
packages/opencode/src/effect/instance-registry.ts
Normal file
12
packages/opencode/src/effect/instance-registry.ts
Normal file
@@ -0,0 +1,12 @@
|
||||
const disposers = new Set<(directory: string) => Promise<void>>()
|
||||
|
||||
export function registerDisposer(disposer: (directory: string) => Promise<void>) {
|
||||
disposers.add(disposer)
|
||||
return () => {
|
||||
disposers.delete(disposer)
|
||||
}
|
||||
}
|
||||
|
||||
export async function disposeInstance(directory: string) {
|
||||
await Promise.allSettled([...disposers].map((disposer) => disposer(directory)))
|
||||
}
|
||||
54
packages/opencode/src/effect/instances.ts
Normal file
54
packages/opencode/src/effect/instances.ts
Normal file
@@ -0,0 +1,54 @@
|
||||
import { Effect, Layer, LayerMap, ServiceMap } from "effect"
|
||||
import { registerDisposer } from "./instance-registry"
|
||||
import { ProviderAuthService } from "@/provider/auth-service"
|
||||
import { QuestionService } from "@/question/service"
|
||||
import { PermissionService } from "@/permission/service"
|
||||
import { FileWatcherService } from "@/file/watcher"
|
||||
import { Instance } from "@/project/instance"
|
||||
import type { Project } from "@/project/project"
|
||||
|
||||
export declare namespace InstanceContext {
|
||||
export interface Shape {
|
||||
readonly directory: string
|
||||
readonly project: Project.Info
|
||||
}
|
||||
}
|
||||
|
||||
export class InstanceContext extends ServiceMap.Service<InstanceContext, InstanceContext.Shape>()(
|
||||
"opencode/InstanceContext",
|
||||
) {}
|
||||
|
||||
export type InstanceServices = QuestionService | PermissionService | ProviderAuthService | FileWatcherService
|
||||
|
||||
function lookup(directory: string) {
|
||||
const project = Instance.project
|
||||
const ctx = Layer.sync(InstanceContext, () => InstanceContext.of({ directory, project }))
|
||||
return Layer.mergeAll(
|
||||
Layer.fresh(QuestionService.layer),
|
||||
Layer.fresh(PermissionService.layer),
|
||||
Layer.fresh(ProviderAuthService.layer),
|
||||
Layer.fresh(FileWatcherService.layer),
|
||||
).pipe(Layer.provide(ctx))
|
||||
}
|
||||
|
||||
export class Instances extends ServiceMap.Service<Instances, LayerMap.LayerMap<string, InstanceServices>>()(
|
||||
"opencode/Instances",
|
||||
) {
|
||||
static readonly layer = Layer.effect(
|
||||
Instances,
|
||||
Effect.gen(function* () {
|
||||
const layerMap = yield* LayerMap.make(lookup, { idleTimeToLive: Infinity })
|
||||
const unregister = registerDisposer((directory) => Effect.runPromise(layerMap.invalidate(directory)))
|
||||
yield* Effect.addFinalizer(() => Effect.sync(unregister))
|
||||
return Instances.of(layerMap)
|
||||
}),
|
||||
)
|
||||
|
||||
static get(directory: string): Layer.Layer<InstanceServices, never, Instances> {
|
||||
return Layer.unwrap(Instances.use((map) => Effect.succeed(map.get(directory))))
|
||||
}
|
||||
|
||||
static invalidate(directory: string): Effect.Effect<void, never, Instances> {
|
||||
return Instances.use((map) => map.invalidate(directory))
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,14 @@
|
||||
import { Layer, ManagedRuntime } from "effect"
|
||||
import { Effect, Layer, ManagedRuntime } from "effect"
|
||||
import { AccountService } from "@/account/service"
|
||||
import { AuthService } from "@/auth/service"
|
||||
import { PermissionService } from "@/permission/service"
|
||||
import { QuestionService } from "@/question/service"
|
||||
import { Instances } from "@/effect/instances"
|
||||
import type { InstanceServices } from "@/effect/instances"
|
||||
import { Instance } from "@/project/instance"
|
||||
|
||||
export const runtime = ManagedRuntime.make(
|
||||
Layer.mergeAll(AccountService.defaultLayer, AuthService.defaultLayer, PermissionService.layer, QuestionService.layer),
|
||||
Layer.mergeAll(AccountService.defaultLayer, Instances.layer).pipe(Layer.provideMerge(AuthService.defaultLayer)),
|
||||
)
|
||||
|
||||
export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceServices>) {
|
||||
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
|
||||
}
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { Bus } from "@/bus"
|
||||
import { InstanceContext } from "@/effect/instances"
|
||||
import { Instance } from "@/project/instance"
|
||||
import z from "zod"
|
||||
import { Instance } from "../project/instance"
|
||||
import { Log } from "../util/log"
|
||||
import { FileIgnore } from "./ignore"
|
||||
import { Config } from "../config/config"
|
||||
@@ -9,118 +10,139 @@ import path from "path"
|
||||
// @ts-ignore
|
||||
import { createWrapper } from "@parcel/watcher/wrapper"
|
||||
import { lazy } from "@/util/lazy"
|
||||
import { withTimeout } from "@/util/timeout"
|
||||
import type ParcelWatcher from "@parcel/watcher"
|
||||
import { Flag } from "@/flag/flag"
|
||||
import { readdir } from "fs/promises"
|
||||
import { git } from "@/util/git"
|
||||
import { Protected } from "./protected"
|
||||
import { Flag } from "@/flag/flag"
|
||||
import { Cause, Effect, Layer, ServiceMap } from "effect"
|
||||
|
||||
const SUBSCRIBE_TIMEOUT_MS = 10_000
|
||||
|
||||
declare const OPENCODE_LIBC: string | undefined
|
||||
|
||||
export namespace FileWatcher {
|
||||
const log = Log.create({ service: "file.watcher" })
|
||||
const log = Log.create({ service: "file.watcher" })
|
||||
|
||||
export const Event = {
|
||||
Updated: BusEvent.define(
|
||||
"file.watcher.updated",
|
||||
z.object({
|
||||
file: z.string(),
|
||||
event: z.union([z.literal("add"), z.literal("change"), z.literal("unlink")]),
|
||||
}),
|
||||
),
|
||||
const event = {
|
||||
Updated: BusEvent.define(
|
||||
"file.watcher.updated",
|
||||
z.object({
|
||||
file: z.string(),
|
||||
event: z.union([z.literal("add"), z.literal("change"), z.literal("unlink")]),
|
||||
}),
|
||||
),
|
||||
}
|
||||
|
||||
const watcher = lazy((): typeof import("@parcel/watcher") | undefined => {
|
||||
try {
|
||||
const binding = require(
|
||||
`@parcel/watcher-${process.platform}-${process.arch}${process.platform === "linux" ? `-${OPENCODE_LIBC || "glibc"}` : ""}`,
|
||||
)
|
||||
return createWrapper(binding) as typeof import("@parcel/watcher")
|
||||
} catch (error) {
|
||||
log.error("failed to load watcher binding", { error })
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
const watcher = lazy((): typeof import("@parcel/watcher") | undefined => {
|
||||
try {
|
||||
const binding = require(
|
||||
`@parcel/watcher-${process.platform}-${process.arch}${process.platform === "linux" ? `-${OPENCODE_LIBC || "glibc"}` : ""}`,
|
||||
)
|
||||
return createWrapper(binding) as typeof import("@parcel/watcher")
|
||||
} catch (error) {
|
||||
log.error("failed to load watcher binding", { error })
|
||||
return
|
||||
}
|
||||
})
|
||||
function getBackend() {
|
||||
if (process.platform === "win32") return "windows"
|
||||
if (process.platform === "darwin") return "fs-events"
|
||||
if (process.platform === "linux") return "inotify"
|
||||
}
|
||||
|
||||
const state = Instance.state(
|
||||
async () => {
|
||||
log.info("init")
|
||||
const cfg = await Config.get()
|
||||
const backend = (() => {
|
||||
if (process.platform === "win32") return "windows"
|
||||
if (process.platform === "darwin") return "fs-events"
|
||||
if (process.platform === "linux") return "inotify"
|
||||
})()
|
||||
if (!backend) {
|
||||
log.error("watcher backend not supported", { platform: process.platform })
|
||||
return {}
|
||||
}
|
||||
log.info("watcher backend", { platform: process.platform, backend })
|
||||
export namespace FileWatcher {
|
||||
export const Event = event
|
||||
/** Whether the native @parcel/watcher binding is available on this platform. */
|
||||
export const hasNativeBinding = () => !!watcher()
|
||||
}
|
||||
|
||||
const w = watcher()
|
||||
if (!w) return {}
|
||||
const init = Effect.fn("FileWatcherService.init")(function* () {})
|
||||
|
||||
const subscribe: ParcelWatcher.SubscribeCallback = (err, evts) => {
|
||||
if (err) return
|
||||
for (const evt of evts) {
|
||||
if (evt.type === "create") Bus.publish(Event.Updated, { file: evt.path, event: "add" })
|
||||
if (evt.type === "update") Bus.publish(Event.Updated, { file: evt.path, event: "change" })
|
||||
if (evt.type === "delete") Bus.publish(Event.Updated, { file: evt.path, event: "unlink" })
|
||||
}
|
||||
}
|
||||
|
||||
const subs: ParcelWatcher.AsyncSubscription[] = []
|
||||
const cfgIgnores = cfg.watcher?.ignore ?? []
|
||||
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_FILEWATCHER) {
|
||||
const pending = w.subscribe(Instance.directory, subscribe, {
|
||||
ignore: [...FileIgnore.PATTERNS, ...cfgIgnores, ...Protected.paths()],
|
||||
backend,
|
||||
})
|
||||
const sub = await withTimeout(pending, SUBSCRIBE_TIMEOUT_MS).catch((err) => {
|
||||
log.error("failed to subscribe to Instance.directory", { error: err })
|
||||
pending.then((s) => s.unsubscribe()).catch(() => {})
|
||||
return undefined
|
||||
})
|
||||
if (sub) subs.push(sub)
|
||||
}
|
||||
|
||||
if (Instance.project.vcs === "git") {
|
||||
const result = await git(["rev-parse", "--git-dir"], {
|
||||
cwd: Instance.worktree,
|
||||
})
|
||||
const vcsDir = result.exitCode === 0 ? path.resolve(Instance.worktree, result.text().trim()) : undefined
|
||||
if (vcsDir && !cfgIgnores.includes(".git") && !cfgIgnores.includes(vcsDir)) {
|
||||
const gitDirContents = await readdir(vcsDir).catch(() => [])
|
||||
const ignoreList = gitDirContents.filter((entry) => entry !== "HEAD")
|
||||
const pending = w.subscribe(vcsDir, subscribe, {
|
||||
ignore: ignoreList,
|
||||
backend,
|
||||
})
|
||||
const sub = await withTimeout(pending, SUBSCRIBE_TIMEOUT_MS).catch((err) => {
|
||||
log.error("failed to subscribe to vcsDir", { error: err })
|
||||
pending.then((s) => s.unsubscribe()).catch(() => {})
|
||||
return undefined
|
||||
})
|
||||
if (sub) subs.push(sub)
|
||||
}
|
||||
}
|
||||
|
||||
return { subs }
|
||||
},
|
||||
async (state) => {
|
||||
if (!state.subs) return
|
||||
await Promise.all(state.subs.map((sub) => sub?.unsubscribe()))
|
||||
},
|
||||
)
|
||||
|
||||
export function init() {
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER) {
|
||||
return
|
||||
}
|
||||
state()
|
||||
export namespace FileWatcherService {
|
||||
export interface Service {
|
||||
readonly init: () => Effect.Effect<void>
|
||||
}
|
||||
}
|
||||
|
||||
export class FileWatcherService extends ServiceMap.Service<FileWatcherService, FileWatcherService.Service>()(
|
||||
"@opencode/FileWatcher",
|
||||
) {
|
||||
static readonly layer = Layer.effect(
|
||||
FileWatcherService,
|
||||
Effect.gen(function* () {
|
||||
const instance = yield* InstanceContext
|
||||
if (yield* Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER) return FileWatcherService.of({ init })
|
||||
|
||||
log.info("init", { directory: instance.directory })
|
||||
|
||||
const backend = getBackend()
|
||||
if (!backend) {
|
||||
log.error("watcher backend not supported", { directory: instance.directory, platform: process.platform })
|
||||
return FileWatcherService.of({ init })
|
||||
}
|
||||
|
||||
const w = watcher()
|
||||
if (!w) return FileWatcherService.of({ init })
|
||||
|
||||
log.info("watcher backend", { directory: instance.directory, platform: process.platform, backend })
|
||||
|
||||
const subs: ParcelWatcher.AsyncSubscription[] = []
|
||||
yield* Effect.addFinalizer(() => Effect.promise(() => Promise.allSettled(subs.map((sub) => sub.unsubscribe()))))
|
||||
|
||||
const cb: ParcelWatcher.SubscribeCallback = Instance.bind((err, evts) => {
|
||||
if (err) return
|
||||
for (const evt of evts) {
|
||||
if (evt.type === "create") Bus.publish(event.Updated, { file: evt.path, event: "add" })
|
||||
if (evt.type === "update") Bus.publish(event.Updated, { file: evt.path, event: "change" })
|
||||
if (evt.type === "delete") Bus.publish(event.Updated, { file: evt.path, event: "unlink" })
|
||||
}
|
||||
})
|
||||
|
||||
const subscribe = (dir: string, ignore: string[]) => {
|
||||
const pending = w.subscribe(dir, cb, { ignore, backend })
|
||||
return Effect.gen(function* () {
|
||||
const sub = yield* Effect.promise(() => pending)
|
||||
subs.push(sub)
|
||||
}).pipe(
|
||||
Effect.timeout(SUBSCRIBE_TIMEOUT_MS),
|
||||
Effect.catchCause((cause) => {
|
||||
log.error("failed to subscribe", { dir, cause: Cause.pretty(cause) })
|
||||
// Clean up a subscription that resolves after timeout
|
||||
pending.then((s) => s.unsubscribe()).catch(() => {})
|
||||
return Effect.void
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
const cfg = yield* Effect.promise(() => Config.get())
|
||||
const cfgIgnores = cfg.watcher?.ignore ?? []
|
||||
|
||||
if (yield* Flag.OPENCODE_EXPERIMENTAL_FILEWATCHER) {
|
||||
yield* subscribe(instance.directory, [...FileIgnore.PATTERNS, ...cfgIgnores, ...Protected.paths()])
|
||||
}
|
||||
|
||||
if (instance.project.vcs === "git") {
|
||||
const result = yield* Effect.promise(() =>
|
||||
git(["rev-parse", "--git-dir"], {
|
||||
cwd: instance.project.worktree,
|
||||
}),
|
||||
)
|
||||
const vcsDir = result.exitCode === 0 ? path.resolve(instance.project.worktree, result.text().trim()) : undefined
|
||||
if (vcsDir && !cfgIgnores.includes(".git") && !cfgIgnores.includes(vcsDir)) {
|
||||
const ignore = (yield* Effect.promise(() => readdir(vcsDir).catch(() => []))).filter(
|
||||
(entry) => entry !== "HEAD",
|
||||
)
|
||||
yield* subscribe(vcsDir, ignore)
|
||||
}
|
||||
}
|
||||
|
||||
return FileWatcherService.of({ init })
|
||||
}).pipe(
|
||||
Effect.catchCause((cause) => {
|
||||
log.error("failed to init watcher service", { cause: Cause.pretty(cause) })
|
||||
return Effect.succeed(FileWatcherService.of({ init }))
|
||||
}),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import { Config } from "effect"
|
||||
|
||||
function truthy(key: string) {
|
||||
const value = process.env[key]?.toLowerCase()
|
||||
return value === "true" || value === "1"
|
||||
@@ -40,8 +42,12 @@ export namespace Flag {
|
||||
|
||||
// Experimental
|
||||
export const OPENCODE_EXPERIMENTAL = truthy("OPENCODE_EXPERIMENTAL")
|
||||
export const OPENCODE_EXPERIMENTAL_FILEWATCHER = truthy("OPENCODE_EXPERIMENTAL_FILEWATCHER")
|
||||
export const OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = truthy("OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER")
|
||||
export const OPENCODE_EXPERIMENTAL_FILEWATCHER = Config.boolean("OPENCODE_EXPERIMENTAL_FILEWATCHER").pipe(
|
||||
Config.withDefault(false),
|
||||
)
|
||||
export const OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = Config.boolean(
|
||||
"OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER",
|
||||
).pipe(Config.withDefault(false))
|
||||
export const OPENCODE_EXPERIMENTAL_ICON_DISCOVERY =
|
||||
OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_ICON_DISCOVERY")
|
||||
|
||||
|
||||
@@ -1,18 +1,9 @@
|
||||
import { runtime } from "@/effect/runtime"
|
||||
import { runPromiseInstance } from "@/effect/runtime"
|
||||
import { Config } from "@/config/config"
|
||||
import { fn } from "@/util/fn"
|
||||
import { Wildcard } from "@/util/wildcard"
|
||||
import { Effect } from "effect"
|
||||
import os from "os"
|
||||
import * as S from "./service"
|
||||
import type {
|
||||
Action as ActionType,
|
||||
PermissionError,
|
||||
Reply as ReplyType,
|
||||
Request as RequestType,
|
||||
Rule as RuleType,
|
||||
Ruleset as RulesetType,
|
||||
} from "./service"
|
||||
|
||||
export namespace PermissionNext {
|
||||
function expand(pattern: string): string {
|
||||
@@ -23,20 +14,16 @@ export namespace PermissionNext {
|
||||
return pattern
|
||||
}
|
||||
|
||||
function runPromise<A>(f: (service: S.PermissionService.Api) => Effect.Effect<A, PermissionError>) {
|
||||
return runtime.runPromise(S.PermissionService.use(f))
|
||||
}
|
||||
|
||||
export const Action = S.Action
|
||||
export type Action = ActionType
|
||||
export type Action = S.Action
|
||||
export const Rule = S.Rule
|
||||
export type Rule = RuleType
|
||||
export type Rule = S.Rule
|
||||
export const Ruleset = S.Ruleset
|
||||
export type Ruleset = RulesetType
|
||||
export type Ruleset = S.Ruleset
|
||||
export const Request = S.Request
|
||||
export type Request = RequestType
|
||||
export type Request = S.Request
|
||||
export const Reply = S.Reply
|
||||
export type Reply = ReplyType
|
||||
export type Reply = S.Reply
|
||||
export const Approval = S.Approval
|
||||
export const Event = S.Event
|
||||
export const Service = S.PermissionService
|
||||
@@ -66,12 +53,16 @@ export namespace PermissionNext {
|
||||
return rulesets.flat()
|
||||
}
|
||||
|
||||
export const ask = fn(S.AskInput, async (input) => runPromise((service) => service.ask(input)))
|
||||
export const ask = fn(S.AskInput, async (input) =>
|
||||
runPromiseInstance(S.PermissionService.use((service) => service.ask(input))),
|
||||
)
|
||||
|
||||
export const reply = fn(S.ReplyInput, async (input) => runPromise((service) => service.reply(input)))
|
||||
export const reply = fn(S.ReplyInput, async (input) =>
|
||||
runPromiseInstance(S.PermissionService.use((service) => service.reply(input))),
|
||||
)
|
||||
|
||||
export async function list() {
|
||||
return runPromise((service) => service.list())
|
||||
return runPromiseInstance(S.PermissionService.use((service) => service.list()))
|
||||
}
|
||||
|
||||
export function evaluate(permission: string, pattern: string, ...rulesets: Ruleset[]): Rule {
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
import { Bus } from "@/bus"
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { Instance } from "@/project/instance"
|
||||
import { InstanceContext } from "@/effect/instances"
|
||||
import { ProjectID } from "@/project/schema"
|
||||
import { MessageID, SessionID } from "@/session/schema"
|
||||
import { PermissionTable } from "@/session/session.sql"
|
||||
import { Database, eq } from "@/storage/db"
|
||||
import { InstanceState } from "@/util/instance-state"
|
||||
import { Log } from "@/util/log"
|
||||
import { Wildcard } from "@/util/wildcard"
|
||||
import { Deferred, Effect, Layer, Schema, ServiceMap } from "effect"
|
||||
@@ -104,11 +103,6 @@ interface PendingEntry {
|
||||
deferred: Deferred.Deferred<void, RejectedError | CorrectedError>
|
||||
}
|
||||
|
||||
type State = {
|
||||
pending: Map<PermissionID, PendingEntry>
|
||||
approved: Ruleset
|
||||
}
|
||||
|
||||
export const AskInput = Request.partial({ id: true }).extend({
|
||||
ruleset: Ruleset,
|
||||
})
|
||||
@@ -133,25 +127,19 @@ export class PermissionService extends ServiceMap.Service<PermissionService, Per
|
||||
static readonly layer = Layer.effect(
|
||||
PermissionService,
|
||||
Effect.gen(function* () {
|
||||
const instanceState = yield* InstanceState.make<State>(() =>
|
||||
Effect.sync(() => {
|
||||
const row = Database.use((db) =>
|
||||
db.select().from(PermissionTable).where(eq(PermissionTable.project_id, Instance.project.id)).get(),
|
||||
)
|
||||
return {
|
||||
pending: new Map<PermissionID, PendingEntry>(),
|
||||
approved: row?.data ?? [],
|
||||
}
|
||||
}),
|
||||
const { project } = yield* InstanceContext
|
||||
const row = Database.use((db) =>
|
||||
db.select().from(PermissionTable).where(eq(PermissionTable.project_id, project.id)).get(),
|
||||
)
|
||||
const pending = new Map<PermissionID, PendingEntry>()
|
||||
const approved: Ruleset = row?.data ?? []
|
||||
|
||||
const ask = Effect.fn("PermissionService.ask")(function* (input: z.infer<typeof AskInput>) {
|
||||
const state = yield* InstanceState.get(instanceState)
|
||||
const { ruleset, ...request } = input
|
||||
let pending = false
|
||||
let needsAsk = false
|
||||
|
||||
for (const pattern of request.patterns) {
|
||||
const rule = evaluate(request.permission, pattern, ruleset, state.approved)
|
||||
const rule = evaluate(request.permission, pattern, ruleset, approved)
|
||||
log.info("evaluated", { permission: request.permission, pattern, action: rule })
|
||||
if (rule.action === "deny") {
|
||||
return yield* new DeniedError({
|
||||
@@ -159,10 +147,10 @@ export class PermissionService extends ServiceMap.Service<PermissionService, Per
|
||||
})
|
||||
}
|
||||
if (rule.action === "allow") continue
|
||||
pending = true
|
||||
needsAsk = true
|
||||
}
|
||||
|
||||
if (!pending) return
|
||||
if (!needsAsk) return
|
||||
|
||||
const id = request.id ?? PermissionID.ascending()
|
||||
const info: Request = {
|
||||
@@ -172,22 +160,21 @@ export class PermissionService extends ServiceMap.Service<PermissionService, Per
|
||||
log.info("asking", { id, permission: info.permission, patterns: info.patterns })
|
||||
|
||||
const deferred = yield* Deferred.make<void, RejectedError | CorrectedError>()
|
||||
state.pending.set(id, { info, deferred })
|
||||
pending.set(id, { info, deferred })
|
||||
void Bus.publish(Event.Asked, info)
|
||||
return yield* Effect.ensuring(
|
||||
Deferred.await(deferred),
|
||||
Effect.sync(() => {
|
||||
state.pending.delete(id)
|
||||
pending.delete(id)
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
const reply = Effect.fn("PermissionService.reply")(function* (input: z.infer<typeof ReplyInput>) {
|
||||
const state = yield* InstanceState.get(instanceState)
|
||||
const existing = state.pending.get(input.requestID)
|
||||
const existing = pending.get(input.requestID)
|
||||
if (!existing) return
|
||||
|
||||
state.pending.delete(input.requestID)
|
||||
pending.delete(input.requestID)
|
||||
void Bus.publish(Event.Replied, {
|
||||
sessionID: existing.info.sessionID,
|
||||
requestID: existing.info.id,
|
||||
@@ -200,9 +187,9 @@ export class PermissionService extends ServiceMap.Service<PermissionService, Per
|
||||
input.message ? new CorrectedError({ feedback: input.message }) : new RejectedError(),
|
||||
)
|
||||
|
||||
for (const [id, item] of state.pending.entries()) {
|
||||
for (const [id, item] of pending.entries()) {
|
||||
if (item.info.sessionID !== existing.info.sessionID) continue
|
||||
state.pending.delete(id)
|
||||
pending.delete(id)
|
||||
void Bus.publish(Event.Replied, {
|
||||
sessionID: item.info.sessionID,
|
||||
requestID: item.info.id,
|
||||
@@ -217,20 +204,20 @@ export class PermissionService extends ServiceMap.Service<PermissionService, Per
|
||||
if (input.reply === "once") return
|
||||
|
||||
for (const pattern of existing.info.always) {
|
||||
state.approved.push({
|
||||
approved.push({
|
||||
permission: existing.info.permission,
|
||||
pattern,
|
||||
action: "allow",
|
||||
})
|
||||
}
|
||||
|
||||
for (const [id, item] of state.pending.entries()) {
|
||||
for (const [id, item] of pending.entries()) {
|
||||
if (item.info.sessionID !== existing.info.sessionID) continue
|
||||
const ok = item.info.patterns.every(
|
||||
(pattern) => evaluate(item.info.permission, pattern, state.approved).action === "allow",
|
||||
(pattern) => evaluate(item.info.permission, pattern, approved).action === "allow",
|
||||
)
|
||||
if (!ok) continue
|
||||
state.pending.delete(id)
|
||||
pending.delete(id)
|
||||
void Bus.publish(Event.Replied, {
|
||||
sessionID: item.info.sessionID,
|
||||
requestID: item.info.id,
|
||||
@@ -246,8 +233,7 @@ export class PermissionService extends ServiceMap.Service<PermissionService, Per
|
||||
})
|
||||
|
||||
const list = Effect.fn("PermissionService.list")(function* () {
|
||||
const state = yield* InstanceState.get(instanceState)
|
||||
return Array.from(state.pending.values(), (item) => item.info)
|
||||
return Array.from(pending.values(), (item) => item.info)
|
||||
})
|
||||
|
||||
return PermissionService.of({ ask, reply, list })
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { Plugin } from "../plugin"
|
||||
import { Format } from "../format"
|
||||
import { LSP } from "../lsp"
|
||||
import { FileWatcher } from "../file/watcher"
|
||||
import { FileWatcherService } from "../file/watcher"
|
||||
import { File } from "../file"
|
||||
import { Project } from "./project"
|
||||
import { Bus } from "../bus"
|
||||
@@ -12,6 +12,7 @@ import { Log } from "@/util/log"
|
||||
import { ShareNext } from "@/share/share-next"
|
||||
import { Snapshot } from "../snapshot"
|
||||
import { Truncate } from "../tool/truncation"
|
||||
import { runPromiseInstance } from "@/effect/runtime"
|
||||
|
||||
export async function InstanceBootstrap() {
|
||||
Log.Default.info("bootstrapping", { directory: Instance.directory })
|
||||
@@ -19,7 +20,7 @@ export async function InstanceBootstrap() {
|
||||
ShareNext.init()
|
||||
Format.init()
|
||||
await LSP.init()
|
||||
FileWatcher.init()
|
||||
await runPromiseInstance(FileWatcherService.use((service) => service.init()))
|
||||
File.init()
|
||||
Vcs.init()
|
||||
Snapshot.init()
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import { Effect } from "effect"
|
||||
import { Log } from "@/util/log"
|
||||
import { Context } from "../util/context"
|
||||
import { Project } from "./project"
|
||||
@@ -6,7 +5,7 @@ import { State } from "./state"
|
||||
import { iife } from "@/util/iife"
|
||||
import { GlobalBus } from "@/bus/global"
|
||||
import { Filesystem } from "@/util/filesystem"
|
||||
import { InstanceState } from "@/util/instance-state"
|
||||
import { disposeInstance } from "@/effect/instance-registry"
|
||||
|
||||
interface Context {
|
||||
directory: string
|
||||
@@ -102,23 +101,33 @@ export const Instance = {
|
||||
if (Instance.worktree === "/") return false
|
||||
return Filesystem.contains(Instance.worktree, filepath)
|
||||
},
|
||||
/**
|
||||
* Captures the current instance ALS context and returns a wrapper that
|
||||
* restores it when called. Use this for callbacks that fire outside the
|
||||
* instance async context (native addons, event emitters, timers, etc.).
|
||||
*/
|
||||
bind<F extends (...args: any[]) => any>(fn: F): F {
|
||||
const ctx = context.use()
|
||||
return ((...args: any[]) => context.provide(ctx, () => fn(...args))) as F
|
||||
},
|
||||
state<S>(init: () => S, dispose?: (state: Awaited<S>) => Promise<void>): () => S {
|
||||
return State.create(() => Instance.directory, init, dispose)
|
||||
},
|
||||
async reload(input: { directory: string; init?: () => Promise<any>; project?: Project.Info; worktree?: string }) {
|
||||
const directory = Filesystem.resolve(input.directory)
|
||||
Log.Default.info("reloading instance", { directory })
|
||||
await Promise.all([State.dispose(directory), Effect.runPromise(InstanceState.dispose(directory))])
|
||||
await Promise.all([State.dispose(directory), disposeInstance(directory)])
|
||||
cache.delete(directory)
|
||||
const next = track(directory, boot({ ...input, directory }))
|
||||
emit(directory)
|
||||
return await next
|
||||
},
|
||||
async dispose() {
|
||||
Log.Default.info("disposing instance", { directory: Instance.directory })
|
||||
await Promise.all([State.dispose(Instance.directory), Effect.runPromise(InstanceState.dispose(Instance.directory))])
|
||||
cache.delete(Instance.directory)
|
||||
emit(Instance.directory)
|
||||
const directory = Instance.directory
|
||||
Log.Default.info("disposing instance", { directory })
|
||||
await Promise.all([State.dispose(directory), disposeInstance(directory)])
|
||||
cache.delete(directory)
|
||||
emit(directory)
|
||||
},
|
||||
async disposeAll() {
|
||||
if (disposal.all) return disposal.all
|
||||
|
||||
@@ -1,12 +1,9 @@
|
||||
import { Effect, Layer, Record, ServiceMap, Struct } from "effect"
|
||||
import { Instance } from "@/project/instance"
|
||||
import { Plugin } from "../plugin"
|
||||
import { filter, fromEntries, map, pipe } from "remeda"
|
||||
import type { AuthOuathResult } from "@opencode-ai/plugin"
|
||||
import { NamedError } from "@opencode-ai/util/error"
|
||||
import * as Auth from "@/auth/service"
|
||||
import { InstanceState } from "@/util/instance-state"
|
||||
import { ProviderID } from "./schema"
|
||||
import { Effect, Layer, Record, ServiceMap, Struct } from "effect"
|
||||
import { filter, fromEntries, map, pipe } from "remeda"
|
||||
import z from "zod"
|
||||
|
||||
export const Method = z
|
||||
@@ -54,21 +51,13 @@ export type ProviderAuthError =
|
||||
|
||||
export namespace ProviderAuthService {
|
||||
export interface Service {
|
||||
/** Get available auth methods for each provider (e.g. OAuth, API key). */
|
||||
readonly methods: () => Effect.Effect<Record<string, Method[]>>
|
||||
|
||||
/** Start an OAuth authorization flow for a provider. Returns the URL to redirect to. */
|
||||
readonly authorize: (input: { providerID: ProviderID; method: number }) => Effect.Effect<Authorization | undefined>
|
||||
|
||||
/** Complete an OAuth flow after the user has authorized. Exchanges the code/callback for credentials. */
|
||||
readonly callback: (input: {
|
||||
providerID: ProviderID
|
||||
method: number
|
||||
code?: string
|
||||
}) => Effect.Effect<void, ProviderAuthError>
|
||||
|
||||
/** Set an API key directly for a provider (no OAuth flow). */
|
||||
readonly api: (input: { providerID: ProviderID; key: string }) => Effect.Effect<void, Auth.AuthServiceError>
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,32 +68,29 @@ export class ProviderAuthService extends ServiceMap.Service<ProviderAuthService,
|
||||
ProviderAuthService,
|
||||
Effect.gen(function* () {
|
||||
const auth = yield* Auth.AuthService
|
||||
const state = yield* InstanceState.make(() =>
|
||||
Effect.promise(async () => {
|
||||
const methods = pipe(
|
||||
await Plugin.list(),
|
||||
filter((x) => x.auth?.provider !== undefined),
|
||||
map((x) => [x.auth!.provider, x.auth!] as const),
|
||||
fromEntries(),
|
||||
)
|
||||
return { methods, pending: new Map<ProviderID, AuthOuathResult>() }
|
||||
}),
|
||||
)
|
||||
const hooks = yield* Effect.promise(async () => {
|
||||
const mod = await import("../plugin")
|
||||
return pipe(
|
||||
await mod.Plugin.list(),
|
||||
filter((x) => x.auth?.provider !== undefined),
|
||||
map((x) => [x.auth!.provider, x.auth!] as const),
|
||||
fromEntries(),
|
||||
)
|
||||
})
|
||||
const pending = new Map<ProviderID, AuthOuathResult>()
|
||||
|
||||
const methods = Effect.fn("ProviderAuthService.methods")(function* () {
|
||||
const x = yield* InstanceState.get(state)
|
||||
return Record.map(x.methods, (y) => y.methods.map((z): Method => Struct.pick(z, ["type", "label"])))
|
||||
return Record.map(hooks, (item) => item.methods.map((method): Method => Struct.pick(method, ["type", "label"])))
|
||||
})
|
||||
|
||||
const authorize = Effect.fn("ProviderAuthService.authorize")(function* (input: {
|
||||
providerID: ProviderID
|
||||
method: number
|
||||
}) {
|
||||
const s = yield* InstanceState.get(state)
|
||||
const method = s.methods[input.providerID].methods[input.method]
|
||||
const method = hooks[input.providerID].methods[input.method]
|
||||
if (method.type !== "oauth") return
|
||||
const result = yield* Effect.promise(() => method.authorize())
|
||||
s.pending.set(input.providerID, result)
|
||||
pending.set(input.providerID, result)
|
||||
return {
|
||||
url: result.url,
|
||||
method: result.method,
|
||||
@@ -117,17 +103,14 @@ export class ProviderAuthService extends ServiceMap.Service<ProviderAuthService,
|
||||
method: number
|
||||
code?: string
|
||||
}) {
|
||||
const s = yield* InstanceState.get(state)
|
||||
const match = s.pending.get(input.providerID)
|
||||
const match = pending.get(input.providerID)
|
||||
if (!match) return yield* Effect.fail(new OauthMissing({ providerID: input.providerID }))
|
||||
|
||||
if (match.method === "code" && !input.code)
|
||||
return yield* Effect.fail(new OauthCodeMissing({ providerID: input.providerID }))
|
||||
|
||||
const result = yield* Effect.promise(() =>
|
||||
match.method === "code" ? match.callback(input.code!) : match.callback(),
|
||||
)
|
||||
|
||||
if (!result || result.type !== "success") return yield* Effect.fail(new OauthCallbackFailed({}))
|
||||
|
||||
if ("key" in result) {
|
||||
@@ -148,18 +131,10 @@ export class ProviderAuthService extends ServiceMap.Service<ProviderAuthService,
|
||||
}
|
||||
})
|
||||
|
||||
const api = Effect.fn("ProviderAuthService.api")(function* (input: { providerID: ProviderID; key: string }) {
|
||||
yield* auth.set(input.providerID, {
|
||||
type: "api",
|
||||
key: input.key,
|
||||
})
|
||||
})
|
||||
|
||||
return ProviderAuthService.of({
|
||||
methods,
|
||||
authorize,
|
||||
callback,
|
||||
api,
|
||||
})
|
||||
}),
|
||||
)
|
||||
|
||||
@@ -1,25 +1,16 @@
|
||||
import { Effect, ManagedRuntime } from "effect"
|
||||
import z from "zod"
|
||||
|
||||
import { runPromiseInstance } from "@/effect/runtime"
|
||||
import { fn } from "@/util/fn"
|
||||
import * as S from "./auth-service"
|
||||
import { ProviderID } from "./schema"
|
||||
|
||||
// Separate runtime: ProviderAuthService can't join the shared runtime because
|
||||
// runtime.ts → auth-service.ts → provider/auth.ts creates a circular import.
|
||||
// AuthService is stateless file I/O so the duplicate instance is harmless.
|
||||
const rt = ManagedRuntime.make(S.ProviderAuthService.defaultLayer)
|
||||
|
||||
function runPromise<A>(f: (service: S.ProviderAuthService.Service) => Effect.Effect<A, S.ProviderAuthError>) {
|
||||
return rt.runPromise(S.ProviderAuthService.use(f))
|
||||
}
|
||||
|
||||
export namespace ProviderAuth {
|
||||
export const Method = S.Method
|
||||
export type Method = S.Method
|
||||
|
||||
export async function methods() {
|
||||
return runPromise((service) => service.methods())
|
||||
return runPromiseInstance(S.ProviderAuthService.use((service) => service.methods()))
|
||||
}
|
||||
|
||||
export const Authorization = S.Authorization
|
||||
@@ -30,7 +21,8 @@ export namespace ProviderAuth {
|
||||
providerID: ProviderID.zod,
|
||||
method: z.number(),
|
||||
}),
|
||||
async (input): Promise<Authorization | undefined> => runPromise((service) => service.authorize(input)),
|
||||
async (input): Promise<Authorization | undefined> =>
|
||||
runPromiseInstance(S.ProviderAuthService.use((service) => service.authorize(input))),
|
||||
)
|
||||
|
||||
export const callback = fn(
|
||||
@@ -39,15 +31,7 @@ export namespace ProviderAuth {
|
||||
method: z.number(),
|
||||
code: z.string().optional(),
|
||||
}),
|
||||
async (input) => runPromise((service) => service.callback(input)),
|
||||
)
|
||||
|
||||
export const api = fn(
|
||||
z.object({
|
||||
providerID: ProviderID.zod,
|
||||
key: z.string(),
|
||||
}),
|
||||
async (input) => runPromise((service) => service.api(input)),
|
||||
async (input) => runPromiseInstance(S.ProviderAuthService.use((service) => service.callback(input))),
|
||||
)
|
||||
|
||||
export import OauthMissing = S.OauthMissing
|
||||
|
||||
@@ -167,40 +167,44 @@ export namespace Pty {
|
||||
subscribers: new Map(),
|
||||
}
|
||||
state().set(id, session)
|
||||
ptyProcess.onData((chunk) => {
|
||||
session.cursor += chunk.length
|
||||
ptyProcess.onData(
|
||||
Instance.bind((chunk) => {
|
||||
session.cursor += chunk.length
|
||||
|
||||
for (const [key, ws] of session.subscribers.entries()) {
|
||||
if (ws.readyState !== 1) {
|
||||
session.subscribers.delete(key)
|
||||
continue
|
||||
for (const [key, ws] of session.subscribers.entries()) {
|
||||
if (ws.readyState !== 1) {
|
||||
session.subscribers.delete(key)
|
||||
continue
|
||||
}
|
||||
|
||||
if (ws.data !== key) {
|
||||
session.subscribers.delete(key)
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
ws.send(chunk)
|
||||
} catch {
|
||||
session.subscribers.delete(key)
|
||||
}
|
||||
}
|
||||
|
||||
if (ws.data !== key) {
|
||||
session.subscribers.delete(key)
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
ws.send(chunk)
|
||||
} catch {
|
||||
session.subscribers.delete(key)
|
||||
}
|
||||
}
|
||||
|
||||
session.buffer += chunk
|
||||
if (session.buffer.length <= BUFFER_LIMIT) return
|
||||
const excess = session.buffer.length - BUFFER_LIMIT
|
||||
session.buffer = session.buffer.slice(excess)
|
||||
session.bufferCursor += excess
|
||||
})
|
||||
ptyProcess.onExit(({ exitCode }) => {
|
||||
if (session.info.status === "exited") return
|
||||
log.info("session exited", { id, exitCode })
|
||||
session.info.status = "exited"
|
||||
Bus.publish(Event.Exited, { id, exitCode })
|
||||
remove(id)
|
||||
})
|
||||
session.buffer += chunk
|
||||
if (session.buffer.length <= BUFFER_LIMIT) return
|
||||
const excess = session.buffer.length - BUFFER_LIMIT
|
||||
session.buffer = session.buffer.slice(excess)
|
||||
session.bufferCursor += excess
|
||||
}),
|
||||
)
|
||||
ptyProcess.onExit(
|
||||
Instance.bind(({ exitCode }) => {
|
||||
if (session.info.status === "exited") return
|
||||
log.info("session exited", { id, exitCode })
|
||||
session.info.status = "exited"
|
||||
Bus.publish(Event.Exited, { id, exitCode })
|
||||
remove(id)
|
||||
}),
|
||||
)
|
||||
Bus.publish(Event.Created, { info })
|
||||
return info
|
||||
}
|
||||
|
||||
@@ -1,13 +1,8 @@
|
||||
import { Effect } from "effect"
|
||||
import { runtime } from "@/effect/runtime"
|
||||
import { runPromiseInstance } from "@/effect/runtime"
|
||||
import * as S from "./service"
|
||||
import type { QuestionID } from "./schema"
|
||||
import type { SessionID, MessageID } from "@/session/schema"
|
||||
|
||||
function runPromise<A, E>(f: (service: S.QuestionService.Service) => Effect.Effect<A, E>) {
|
||||
return runtime.runPromise(S.QuestionService.use(f))
|
||||
}
|
||||
|
||||
export namespace Question {
|
||||
export const Option = S.Option
|
||||
export type Option = S.Option
|
||||
@@ -27,18 +22,18 @@ export namespace Question {
|
||||
questions: Info[]
|
||||
tool?: { messageID: MessageID; callID: string }
|
||||
}): Promise<Answer[]> {
|
||||
return runPromise((service) => service.ask(input))
|
||||
return runPromiseInstance(S.QuestionService.use((service) => service.ask(input)))
|
||||
}
|
||||
|
||||
export async function reply(input: { requestID: QuestionID; answers: Answer[] }): Promise<void> {
|
||||
return runPromise((service) => service.reply(input))
|
||||
return runPromiseInstance(S.QuestionService.use((service) => service.reply(input)))
|
||||
}
|
||||
|
||||
export async function reject(requestID: QuestionID): Promise<void> {
|
||||
return runPromise((service) => service.reject(requestID))
|
||||
return runPromiseInstance(S.QuestionService.use((service) => service.reject(requestID)))
|
||||
}
|
||||
|
||||
export async function list(): Promise<Request[]> {
|
||||
return runPromise((service) => service.list())
|
||||
return runPromiseInstance(S.QuestionService.use((service) => service.list()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ import { Deferred, Effect, Layer, Schema, ServiceMap } from "effect"
|
||||
import { Bus } from "@/bus"
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { SessionID, MessageID } from "@/session/schema"
|
||||
import { InstanceState } from "@/util/instance-state"
|
||||
import { Log } from "@/util/log"
|
||||
import z from "zod"
|
||||
import { QuestionID } from "./schema"
|
||||
@@ -104,18 +103,13 @@ export class QuestionService extends ServiceMap.Service<QuestionService, Questio
|
||||
static readonly layer = Layer.effect(
|
||||
QuestionService,
|
||||
Effect.gen(function* () {
|
||||
const instanceState = yield* InstanceState.make<Map<QuestionID, PendingEntry>>(() =>
|
||||
Effect.succeed(new Map<QuestionID, PendingEntry>()),
|
||||
)
|
||||
|
||||
const getPending = InstanceState.get(instanceState)
|
||||
const pending = new Map<QuestionID, PendingEntry>()
|
||||
|
||||
const ask = Effect.fn("QuestionService.ask")(function* (input: {
|
||||
sessionID: SessionID
|
||||
questions: Info[]
|
||||
tool?: { messageID: MessageID; callID: string }
|
||||
}) {
|
||||
const pending = yield* getPending
|
||||
const id = QuestionID.ascending()
|
||||
log.info("asking", { id, questions: input.questions.length })
|
||||
|
||||
@@ -138,7 +132,6 @@ export class QuestionService extends ServiceMap.Service<QuestionService, Questio
|
||||
})
|
||||
|
||||
const reply = Effect.fn("QuestionService.reply")(function* (input: { requestID: QuestionID; answers: Answer[] }) {
|
||||
const pending = yield* getPending
|
||||
const existing = pending.get(input.requestID)
|
||||
if (!existing) {
|
||||
log.warn("reply for unknown request", { requestID: input.requestID })
|
||||
@@ -155,7 +148,6 @@ export class QuestionService extends ServiceMap.Service<QuestionService, Questio
|
||||
})
|
||||
|
||||
const reject = Effect.fn("QuestionService.reject")(function* (requestID: QuestionID) {
|
||||
const pending = yield* getPending
|
||||
const existing = pending.get(requestID)
|
||||
if (!existing) {
|
||||
log.warn("reject for unknown request", { requestID })
|
||||
@@ -171,7 +163,6 @@ export class QuestionService extends ServiceMap.Service<QuestionService, Questio
|
||||
})
|
||||
|
||||
const list = Effect.fn("QuestionService.list")(function* () {
|
||||
const pending = yield* getPending
|
||||
return Array.from(pending.values(), (x) => x.info)
|
||||
})
|
||||
|
||||
|
||||
@@ -1,63 +0,0 @@
|
||||
import { Effect, ScopedCache, Scope } from "effect"
|
||||
|
||||
import { Instance } from "@/project/instance"
|
||||
|
||||
type Disposer = (directory: string) => Effect.Effect<void>
|
||||
const disposers = new Set<Disposer>()
|
||||
|
||||
const TypeId = "~opencode/InstanceState"
|
||||
|
||||
/**
|
||||
* Effect version of `Instance.state` — lazily-initialized, per-directory
|
||||
* cached state for Effect services.
|
||||
*
|
||||
* Values are created on first access for a given directory and cached for
|
||||
* subsequent reads. Concurrent access shares a single initialization —
|
||||
* no duplicate work or races. Use `Effect.acquireRelease` in `init` if
|
||||
* the value needs cleanup on disposal.
|
||||
*/
|
||||
export interface InstanceState<A, E = never, R = never> {
|
||||
readonly [TypeId]: typeof TypeId
|
||||
readonly cache: ScopedCache.ScopedCache<string, A, E, R>
|
||||
}
|
||||
|
||||
export namespace InstanceState {
|
||||
/** Create a new InstanceState with the given initializer. */
|
||||
export const make = <A, E = never, R = never>(
|
||||
init: (directory: string) => Effect.Effect<A, E, R | Scope.Scope>,
|
||||
): Effect.Effect<InstanceState<A, E, Exclude<R, Scope.Scope>>, never, R | Scope.Scope> =>
|
||||
Effect.gen(function* () {
|
||||
const cache = yield* ScopedCache.make<string, A, E, R>({
|
||||
capacity: Number.POSITIVE_INFINITY,
|
||||
lookup: init,
|
||||
})
|
||||
|
||||
const disposer: Disposer = (directory) => ScopedCache.invalidate(cache, directory)
|
||||
disposers.add(disposer)
|
||||
yield* Effect.addFinalizer(() => Effect.sync(() => void disposers.delete(disposer)))
|
||||
|
||||
return {
|
||||
[TypeId]: TypeId,
|
||||
cache,
|
||||
}
|
||||
})
|
||||
|
||||
/** Get the cached value for the current directory, initializing it if needed. */
|
||||
export const get = <A, E, R>(self: InstanceState<A, E, R>) =>
|
||||
Effect.suspend(() => ScopedCache.get(self.cache, Instance.directory))
|
||||
|
||||
/** Check whether a value exists for the current directory. */
|
||||
export const has = <A, E, R>(self: InstanceState<A, E, R>) =>
|
||||
Effect.suspend(() => ScopedCache.has(self.cache, Instance.directory))
|
||||
|
||||
/** Invalidate the cached value for the current directory. */
|
||||
export const invalidate = <A, E, R>(self: InstanceState<A, E, R>) =>
|
||||
Effect.suspend(() => ScopedCache.invalidate(self.cache, Instance.directory))
|
||||
|
||||
/** Invalidate the given directory across all InstanceState caches. */
|
||||
export const dispose = (directory: string) =>
|
||||
Effect.all(
|
||||
[...disposers].map((disposer) => disposer(directory)),
|
||||
{ concurrency: "unbounded" },
|
||||
)
|
||||
}
|
||||
250
packages/opencode/test/file/watcher.test.ts
Normal file
250
packages/opencode/test/file/watcher.test.ts
Normal file
@@ -0,0 +1,250 @@
|
||||
import { $ } from "bun"
|
||||
import { afterEach, describe, expect, test } from "bun:test"
|
||||
import fs from "fs/promises"
|
||||
import path from "path"
|
||||
import { ConfigProvider, Deferred, Effect, Fiber, Layer, ManagedRuntime, Option } from "effect"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import { FileWatcher, FileWatcherService } from "../../src/file/watcher"
|
||||
import { InstanceContext } from "../../src/effect/instances"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { GlobalBus } from "../../src/bus/global"
|
||||
|
||||
// Native @parcel/watcher bindings aren't reliably available in CI (missing on Linux, flaky on Windows)
|
||||
const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const configLayer = ConfigProvider.layer(
|
||||
ConfigProvider.fromUnknown({
|
||||
OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
|
||||
OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
|
||||
}),
|
||||
)
|
||||
|
||||
type BusUpdate = { directory?: string; payload: { type: string; properties: WatcherEvent } }
|
||||
type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
|
||||
|
||||
/** Run `body` with a live FileWatcherService. Runtime is acquired/released via Effect.scoped. */
|
||||
function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
|
||||
return Instance.provide({
|
||||
directory,
|
||||
fn: () =>
|
||||
Effect.gen(function* () {
|
||||
const ctx = Layer.sync(InstanceContext, () =>
|
||||
InstanceContext.of({ directory: Instance.directory, project: Instance.project }),
|
||||
)
|
||||
const layer = Layer.fresh(FileWatcherService.layer).pipe(Layer.provide(ctx), Layer.provide(configLayer))
|
||||
const rt = yield* Effect.acquireRelease(
|
||||
Effect.sync(() => ManagedRuntime.make(layer)),
|
||||
(rt) => Effect.promise(() => rt.dispose()),
|
||||
)
|
||||
yield* Effect.promise(() => rt.runPromise(FileWatcherService.use((s) => s.init())))
|
||||
yield* ready(directory)
|
||||
yield* body
|
||||
}).pipe(Effect.scoped, Effect.runPromise),
|
||||
})
|
||||
}
|
||||
|
||||
function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {
|
||||
let done = false
|
||||
|
||||
function on(evt: BusUpdate) {
|
||||
if (done) return
|
||||
if (evt.directory !== directory) return
|
||||
if (evt.payload.type !== FileWatcher.Event.Updated.type) return
|
||||
if (!check(evt.payload.properties)) return
|
||||
hit(evt.payload.properties)
|
||||
}
|
||||
|
||||
function cleanup() {
|
||||
if (done) return
|
||||
done = true
|
||||
GlobalBus.off("event", on)
|
||||
}
|
||||
|
||||
GlobalBus.on("event", on)
|
||||
return cleanup
|
||||
}
|
||||
|
||||
function wait(directory: string, check: (evt: WatcherEvent) => boolean) {
|
||||
return Effect.callback<WatcherEvent>((resume) => {
|
||||
const cleanup = listen(directory, check, (evt) => {
|
||||
cleanup()
|
||||
resume(Effect.succeed(evt))
|
||||
})
|
||||
return Effect.sync(cleanup)
|
||||
}).pipe(Effect.timeout("5 seconds"))
|
||||
}
|
||||
|
||||
function nextUpdate<E>(directory: string, check: (evt: WatcherEvent) => boolean, trigger: Effect.Effect<void, E>) {
|
||||
return Effect.acquireUseRelease(
|
||||
wait(directory, check).pipe(Effect.forkChild({ startImmediately: true })),
|
||||
(fiber) =>
|
||||
Effect.gen(function* () {
|
||||
yield* trigger
|
||||
return yield* Fiber.join(fiber)
|
||||
}),
|
||||
Fiber.interrupt,
|
||||
)
|
||||
}
|
||||
|
||||
/** Effect that asserts no matching event arrives within `ms`. */
|
||||
function noUpdate<E>(
|
||||
directory: string,
|
||||
check: (evt: WatcherEvent) => boolean,
|
||||
trigger: Effect.Effect<void, E>,
|
||||
ms = 500,
|
||||
) {
|
||||
return Effect.gen(function* () {
|
||||
const deferred = yield* Deferred.make<WatcherEvent>()
|
||||
|
||||
yield* Effect.acquireUseRelease(
|
||||
Effect.sync(() =>
|
||||
listen(directory, check, (evt) => {
|
||||
Effect.runSync(Deferred.succeed(deferred, evt))
|
||||
}),
|
||||
),
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
yield* trigger
|
||||
expect(yield* Deferred.await(deferred).pipe(Effect.timeoutOption(`${ms} millis`))).toEqual(Option.none())
|
||||
}),
|
||||
(cleanup) => Effect.sync(cleanup),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
function ready(directory: string) {
|
||||
const file = path.join(directory, `.watcher-${Math.random().toString(36).slice(2)}`)
|
||||
const head = path.join(directory, ".git", "HEAD")
|
||||
|
||||
return Effect.gen(function* () {
|
||||
yield* nextUpdate(
|
||||
directory,
|
||||
(evt) => evt.file === file && evt.event === "add",
|
||||
Effect.promise(() => fs.writeFile(file, "ready")),
|
||||
).pipe(Effect.ensuring(Effect.promise(() => fs.rm(file, { force: true }).catch(() => undefined))), Effect.asVoid)
|
||||
|
||||
const git = yield* Effect.promise(() =>
|
||||
fs
|
||||
.stat(head)
|
||||
.then(() => true)
|
||||
.catch(() => false),
|
||||
)
|
||||
if (!git) return
|
||||
|
||||
const branch = `watch-${Math.random().toString(36).slice(2)}`
|
||||
const hash = yield* Effect.promise(() => $`git rev-parse HEAD`.cwd(directory).quiet().text())
|
||||
yield* nextUpdate(
|
||||
directory,
|
||||
(evt) => evt.file === head && evt.event !== "unlink",
|
||||
Effect.promise(async () => {
|
||||
await fs.writeFile(path.join(directory, ".git", "refs", "heads", branch), hash.trim() + "\n")
|
||||
await fs.writeFile(head, `ref: refs/heads/${branch}\n`)
|
||||
}),
|
||||
).pipe(Effect.asVoid)
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describeWatcher("FileWatcherService", () => {
|
||||
afterEach(() => Instance.disposeAll())
|
||||
|
||||
test("publishes root create, update, and delete events", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
const file = path.join(tmp.path, "watch.txt")
|
||||
const dir = tmp.path
|
||||
const cases = [
|
||||
{ event: "add" as const, trigger: Effect.promise(() => fs.writeFile(file, "a")) },
|
||||
{ event: "change" as const, trigger: Effect.promise(() => fs.writeFile(file, "b")) },
|
||||
{ event: "unlink" as const, trigger: Effect.promise(() => fs.unlink(file)) },
|
||||
]
|
||||
|
||||
await withWatcher(
|
||||
dir,
|
||||
Effect.forEach(cases, ({ event, trigger }) =>
|
||||
nextUpdate(dir, (evt) => evt.file === file && evt.event === event, trigger).pipe(
|
||||
Effect.tap((evt) => Effect.sync(() => expect(evt).toEqual({ file, event }))),
|
||||
),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("watches non-git roots", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const file = path.join(tmp.path, "plain.txt")
|
||||
const dir = tmp.path
|
||||
|
||||
await withWatcher(
|
||||
dir,
|
||||
nextUpdate(
|
||||
dir,
|
||||
(e) => e.file === file && e.event === "add",
|
||||
Effect.promise(() => fs.writeFile(file, "plain")),
|
||||
).pipe(Effect.tap((evt) => Effect.sync(() => expect(evt).toEqual({ file, event: "add" })))),
|
||||
)
|
||||
})
|
||||
|
||||
test("cleanup stops publishing events", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
const file = path.join(tmp.path, "after-dispose.txt")
|
||||
|
||||
// Start and immediately stop the watcher (withWatcher disposes on exit)
|
||||
await withWatcher(tmp.path, Effect.void)
|
||||
|
||||
// Now write a file — no watcher should be listening
|
||||
await Effect.runPromise(
|
||||
noUpdate(
|
||||
tmp.path,
|
||||
(e) => e.file === file,
|
||||
Effect.promise(() => fs.writeFile(file, "gone")),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("ignores .git/index changes", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
const gitIndex = path.join(tmp.path, ".git", "index")
|
||||
const edit = path.join(tmp.path, "tracked.txt")
|
||||
|
||||
await withWatcher(
|
||||
tmp.path,
|
||||
noUpdate(
|
||||
tmp.path,
|
||||
(e) => e.file === gitIndex,
|
||||
Effect.promise(async () => {
|
||||
await fs.writeFile(edit, "a")
|
||||
await $`git add .`.cwd(tmp.path).quiet().nothrow()
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("publishes .git/HEAD events", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
const head = path.join(tmp.path, ".git", "HEAD")
|
||||
const branch = `watch-${Math.random().toString(36).slice(2)}`
|
||||
await $`git branch ${branch}`.cwd(tmp.path).quiet()
|
||||
|
||||
await withWatcher(
|
||||
tmp.path,
|
||||
nextUpdate(
|
||||
tmp.path,
|
||||
(evt) => evt.file === head && evt.event !== "unlink",
|
||||
Effect.promise(() => fs.writeFile(head, `ref: refs/heads/${branch}\n`)),
|
||||
).pipe(
|
||||
Effect.tap((evt) =>
|
||||
Effect.sync(() => {
|
||||
expect(evt.file).toBe(head)
|
||||
expect(["add", "change"]).toContain(evt.event)
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
})
|
||||
})
|
||||
@@ -1,7 +1,9 @@
|
||||
import { test, expect } from "bun:test"
|
||||
import { afterEach, test, expect } from "bun:test"
|
||||
import os from "os"
|
||||
import { Effect } from "effect"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { runtime } from "../../src/effect/runtime"
|
||||
import { Instances } from "../../src/effect/instances"
|
||||
import { PermissionNext } from "../../src/permission/next"
|
||||
import * as S from "../../src/permission/service"
|
||||
import { PermissionID } from "../../src/permission/schema"
|
||||
@@ -9,6 +11,10 @@ import { Instance } from "../../src/project/instance"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import { MessageID, SessionID } from "../../src/session/schema"
|
||||
|
||||
afterEach(async () => {
|
||||
await Instance.disposeAll()
|
||||
})
|
||||
|
||||
async function rejectAll(message?: string) {
|
||||
for (const req of await PermissionNext.list()) {
|
||||
await PermissionNext.reply({
|
||||
@@ -971,7 +977,7 @@ test("ask - should deny even when an earlier pattern is ask", async () => {
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const ask = PermissionNext.ask({
|
||||
const err = await PermissionNext.ask({
|
||||
sessionID: SessionID.make("session_test"),
|
||||
permission: "bash",
|
||||
patterns: ["echo hello", "rm -rf /"],
|
||||
@@ -981,24 +987,12 @@ test("ask - should deny even when an earlier pattern is ask", async () => {
|
||||
{ permission: "bash", pattern: "echo *", action: "ask" },
|
||||
{ permission: "bash", pattern: "rm *", action: "deny" },
|
||||
],
|
||||
})
|
||||
}).then(
|
||||
() => undefined,
|
||||
(err) => err,
|
||||
)
|
||||
|
||||
const out = await Promise.race([
|
||||
ask.then(
|
||||
() => ({ ok: true as const, err: undefined }),
|
||||
(err) => ({ ok: false as const, err }),
|
||||
),
|
||||
Bun.sleep(100).then(() => "timeout" as const),
|
||||
])
|
||||
|
||||
if (out === "timeout") {
|
||||
await rejectAll()
|
||||
await ask.catch(() => {})
|
||||
throw new Error("ask timed out instead of denying immediately")
|
||||
}
|
||||
|
||||
expect(out.ok).toBe(false)
|
||||
expect(out.err).toBeInstanceOf(PermissionNext.DeniedError)
|
||||
expect(err).toBeInstanceOf(PermissionNext.DeniedError)
|
||||
expect(await PermissionNext.list()).toHaveLength(0)
|
||||
},
|
||||
})
|
||||
@@ -1020,7 +1014,7 @@ test("ask - abort should clear pending request", async () => {
|
||||
always: [],
|
||||
ruleset: [{ permission: "bash", pattern: "*", action: "ask" }],
|
||||
}),
|
||||
),
|
||||
).pipe(Effect.provide(Instances.get(Instance.directory))),
|
||||
{ signal: ctl.signal },
|
||||
)
|
||||
|
||||
|
||||
@@ -1,20 +0,0 @@
|
||||
import { afterEach, expect, test } from "bun:test"
|
||||
import { Auth } from "../../src/auth"
|
||||
import { ProviderAuth } from "../../src/provider/auth"
|
||||
import { ProviderID } from "../../src/provider/schema"
|
||||
|
||||
afterEach(async () => {
|
||||
await Auth.remove("test-provider-auth")
|
||||
})
|
||||
|
||||
test("ProviderAuth.api persists auth via AuthService", async () => {
|
||||
await ProviderAuth.api({
|
||||
providerID: ProviderID.make("test-provider-auth"),
|
||||
key: "sk-test",
|
||||
})
|
||||
|
||||
expect(await Auth.get("test-provider-auth")).toEqual({
|
||||
type: "api",
|
||||
key: "sk-test",
|
||||
})
|
||||
})
|
||||
@@ -6,7 +6,7 @@ import type { PtyID } from "../../src/pty/schema"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import { setTimeout as sleep } from "node:timers/promises"
|
||||
|
||||
const wait = async (fn: () => boolean, ms = 2000) => {
|
||||
const wait = async (fn: () => boolean, ms = 5000) => {
|
||||
const end = Date.now() + ms
|
||||
while (Date.now() < end) {
|
||||
if (fn()) return
|
||||
@@ -20,7 +20,7 @@ const pick = (log: Array<{ type: "created" | "exited" | "deleted"; id: PtyID }>,
|
||||
}
|
||||
|
||||
describe("pty", () => {
|
||||
test("publishes created, exited, deleted in order for /bin/ls + remove", async () => {
|
||||
test("publishes created, exited, deleted in order for a short-lived process", async () => {
|
||||
if (process.platform === "win32") return
|
||||
|
||||
await using dir = await tmpdir({ git: true })
|
||||
@@ -37,7 +37,11 @@ describe("pty", () => {
|
||||
|
||||
let id: PtyID | undefined
|
||||
try {
|
||||
const info = await Pty.create({ command: "/bin/ls", title: "ls" })
|
||||
const info = await Pty.create({
|
||||
command: "/usr/bin/env",
|
||||
args: ["sh", "-c", "sleep 0.1"],
|
||||
title: "sleep",
|
||||
})
|
||||
id = info.id
|
||||
|
||||
await wait(() => pick(log, id!).includes("exited"))
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
import { test, expect } from "bun:test"
|
||||
import { afterEach, test, expect } from "bun:test"
|
||||
import { Question } from "../../src/question"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { QuestionID } from "../../src/question/schema"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import { SessionID } from "../../src/session/schema"
|
||||
|
||||
afterEach(async () => {
|
||||
await Instance.disposeAll()
|
||||
})
|
||||
|
||||
/** Reject all pending questions so dangling Deferred fibers don't hang the test. */
|
||||
async function rejectAll() {
|
||||
const pending = await Question.list()
|
||||
|
||||
@@ -1,261 +0,0 @@
|
||||
import { afterEach, expect, test } from "bun:test"
|
||||
import { Duration, Effect, Layer, ManagedRuntime, ServiceMap } from "effect"
|
||||
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { InstanceState } from "../../src/util/instance-state"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
|
||||
async function access<A, E>(state: InstanceState<A, E>, dir: string) {
|
||||
return Instance.provide({
|
||||
directory: dir,
|
||||
fn: () => Effect.runPromise(InstanceState.get(state)),
|
||||
})
|
||||
}
|
||||
|
||||
afterEach(async () => {
|
||||
await Instance.disposeAll()
|
||||
})
|
||||
|
||||
test("InstanceState caches values for the same instance", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
let n = 0
|
||||
|
||||
await Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make(() => Effect.sync(() => ({ n: ++n })))
|
||||
|
||||
const a = yield* Effect.promise(() => access(state, tmp.path))
|
||||
const b = yield* Effect.promise(() => access(state, tmp.path))
|
||||
|
||||
expect(a).toBe(b)
|
||||
expect(n).toBe(1)
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("InstanceState isolates values by directory", async () => {
|
||||
await using a = await tmpdir()
|
||||
await using b = await tmpdir()
|
||||
let n = 0
|
||||
|
||||
await Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((dir) => Effect.sync(() => ({ dir, n: ++n })))
|
||||
|
||||
const x = yield* Effect.promise(() => access(state, a.path))
|
||||
const y = yield* Effect.promise(() => access(state, b.path))
|
||||
const z = yield* Effect.promise(() => access(state, a.path))
|
||||
|
||||
expect(x).toBe(z)
|
||||
expect(x).not.toBe(y)
|
||||
expect(n).toBe(2)
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("InstanceState is disposed on instance reload", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const seen: string[] = []
|
||||
let n = 0
|
||||
|
||||
await Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make(() =>
|
||||
Effect.acquireRelease(
|
||||
Effect.sync(() => ({ n: ++n })),
|
||||
(value) =>
|
||||
Effect.sync(() => {
|
||||
seen.push(String(value.n))
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
const a = yield* Effect.promise(() => access(state, tmp.path))
|
||||
yield* Effect.promise(() => Instance.reload({ directory: tmp.path }))
|
||||
const b = yield* Effect.promise(() => access(state, tmp.path))
|
||||
|
||||
expect(a).not.toBe(b)
|
||||
expect(seen).toEqual(["1"])
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("InstanceState is disposed on disposeAll", async () => {
|
||||
await using a = await tmpdir()
|
||||
await using b = await tmpdir()
|
||||
const seen: string[] = []
|
||||
|
||||
await Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((dir) =>
|
||||
Effect.acquireRelease(
|
||||
Effect.sync(() => ({ dir })),
|
||||
(value) =>
|
||||
Effect.sync(() => {
|
||||
seen.push(value.dir)
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
yield* Effect.promise(() => access(state, a.path))
|
||||
yield* Effect.promise(() => access(state, b.path))
|
||||
yield* Effect.promise(() => Instance.disposeAll())
|
||||
|
||||
expect(seen.sort()).toEqual([a.path, b.path].sort())
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("InstanceState.get reads correct directory per-evaluation (not captured once)", async () => {
|
||||
await using a = await tmpdir()
|
||||
await using b = await tmpdir()
|
||||
|
||||
// Regression: InstanceState.get must be lazy (Effect.suspend) so the
|
||||
// directory is read per-evaluation, not captured once at the call site.
|
||||
// Without this, a service built inside a ManagedRuntime Layer would
|
||||
// freeze to whichever directory triggered the first layer build.
|
||||
|
||||
interface TestApi {
|
||||
readonly getDir: () => Effect.Effect<string>
|
||||
}
|
||||
|
||||
class TestService extends ServiceMap.Service<TestService, TestApi>()("@test/ALS-lazy") {
|
||||
static readonly layer = Layer.effect(
|
||||
TestService,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((dir) => Effect.sync(() => dir))
|
||||
// `get` is created once during layer build — must be lazy
|
||||
const get = InstanceState.get(state)
|
||||
|
||||
const getDir = Effect.fn("TestService.getDir")(function* () {
|
||||
return yield* get
|
||||
})
|
||||
|
||||
return TestService.of({ getDir })
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
const rt = ManagedRuntime.make(TestService.layer)
|
||||
|
||||
try {
|
||||
const resultA = await Instance.provide({
|
||||
directory: a.path,
|
||||
fn: () => rt.runPromise(TestService.use((s) => s.getDir())),
|
||||
})
|
||||
expect(resultA).toBe(a.path)
|
||||
|
||||
// Second call with different directory must NOT return A's directory
|
||||
const resultB = await Instance.provide({
|
||||
directory: b.path,
|
||||
fn: () => rt.runPromise(TestService.use((s) => s.getDir())),
|
||||
})
|
||||
expect(resultB).toBe(b.path)
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
})
|
||||
|
||||
test("InstanceState.get isolates concurrent fibers across real delays, yields, and timer callbacks", async () => {
|
||||
await using a = await tmpdir()
|
||||
await using b = await tmpdir()
|
||||
await using c = await tmpdir()
|
||||
|
||||
// Adversarial: concurrent fibers with real timer delays (macrotask
|
||||
// boundaries via setTimeout/Bun.sleep), explicit scheduler yields,
|
||||
// and many async steps. If ALS context leaks or gets lost at any
|
||||
// point, a fiber will see the wrong directory.
|
||||
|
||||
interface TestApi {
|
||||
readonly getDir: () => Effect.Effect<string>
|
||||
}
|
||||
|
||||
class TestService extends ServiceMap.Service<TestService, TestApi>()("@test/ALS-adversarial") {
|
||||
static readonly layer = Layer.effect(
|
||||
TestService,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((dir) => Effect.sync(() => dir))
|
||||
|
||||
const getDir = Effect.fn("TestService.getDir")(function* () {
|
||||
// Mix of async boundary types to maximise interleaving:
|
||||
// 1. Real timer delay (macrotask — setTimeout under the hood)
|
||||
yield* Effect.promise(() => Bun.sleep(1))
|
||||
// 2. Effect.sleep (Effect's own timer, uses its internal scheduler)
|
||||
yield* Effect.sleep(Duration.millis(1))
|
||||
// 3. Explicit scheduler yields
|
||||
for (let i = 0; i < 100; i++) {
|
||||
yield* Effect.yieldNow
|
||||
}
|
||||
// 4. Microtask boundaries
|
||||
for (let i = 0; i < 100; i++) {
|
||||
yield* Effect.promise(() => Promise.resolve())
|
||||
}
|
||||
// 5. Another Effect.sleep
|
||||
yield* Effect.sleep(Duration.millis(2))
|
||||
// 6. Another real timer to force a second macrotask hop
|
||||
yield* Effect.promise(() => Bun.sleep(1))
|
||||
// NOW read the directory — ALS must still be correct
|
||||
return yield* InstanceState.get(state)
|
||||
})
|
||||
|
||||
return TestService.of({ getDir })
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
const rt = ManagedRuntime.make(TestService.layer)
|
||||
|
||||
try {
|
||||
const [resultA, resultB, resultC] = await Promise.all([
|
||||
Instance.provide({
|
||||
directory: a.path,
|
||||
fn: () => rt.runPromise(TestService.use((s) => s.getDir())),
|
||||
}),
|
||||
Instance.provide({
|
||||
directory: b.path,
|
||||
fn: () => rt.runPromise(TestService.use((s) => s.getDir())),
|
||||
}),
|
||||
Instance.provide({
|
||||
directory: c.path,
|
||||
fn: () => rt.runPromise(TestService.use((s) => s.getDir())),
|
||||
}),
|
||||
])
|
||||
|
||||
expect(resultA).toBe(a.path)
|
||||
expect(resultB).toBe(b.path)
|
||||
expect(resultC).toBe(c.path)
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
})
|
||||
|
||||
test("InstanceState dedupes concurrent lookups for the same directory", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
let n = 0
|
||||
|
||||
await Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make(() =>
|
||||
Effect.promise(async () => {
|
||||
n += 1
|
||||
await Bun.sleep(10)
|
||||
return { n }
|
||||
}),
|
||||
)
|
||||
|
||||
const [a, b] = yield* Effect.promise(() => Promise.all([access(state, tmp.path), access(state, tmp.path)]))
|
||||
expect(a).toBe(b)
|
||||
expect(n).toBe(1)
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
Reference in New Issue
Block a user