Compare commits

..

22 Commits

Author SHA1 Message Date
Kit Langton
a9bde4a13d fix: guard session projector against missing project FK
Same pattern as the message/part projectors — catch and log FK
constraint errors instead of throwing. Fixes flaky CI where test
ordering causes Session.create to fire before the project row exists.
2026-03-25 21:43:34 -04:00
Kit Langton
31299c6968 Merge branch 'dev' into kit/effectify-lsp 2026-03-25 21:29:16 -04:00
Kit Langton
ba244a6e62 test: restore 5 workers on Windows e2e (#19188) 2026-03-25 21:28:18 -04:00
Kit Langton
cf61f7eb59 fix: use makeRuntime instead of makeRunPromise in LSP service 2026-03-25 21:26:17 -04:00
Vladimir Glafirov
7cb690d7e5 fix: bump gitlab-ai-provider to 5.3.3 for DWS tool approval support (#19185) 2026-03-26 01:17:51 +00:00
Kit Langton
8acccc64df Merge branch 'dev' into kit/effectify-lsp 2026-03-25 21:06:08 -04:00
opencode-agent[bot]
31ad6e85ba chore: generate 2026-03-26 00:56:34 +00:00
Kit Langton
ea04b23745 skill: use Effect.cached for load deduplication (#19165) 2026-03-26 00:55:43 +00:00
Kit Langton
fd173392ac Merge dev into kit/effectify-lsp 2026-03-25 20:43:56 -04:00
Kit Langton
85243d2e5b Merge dev into kit/effectify-lsp 2026-03-25 20:42:52 -04:00
Kit Langton
222782b5d1 Resolve effect-migration merge conflict 2026-03-25 20:40:44 -04:00
Aiden Cline
05c3cfb2aa ignore: update disavowed list (#19184) 2026-03-26 00:20:43 +00:00
Kit Langton
f54e4b60cc file: use Effect.cached for scan deduplication (#19164) 2026-03-25 20:19:58 -04:00
Kit Langton
97c15a087d effectify Bus service: migrate to Effect PubSub + InstanceState (#18579) 2026-03-25 20:19:24 -04:00
Kit Langton
b90de755f9 fix+refactor(mcp): lifecycle tests, cancelPending fix, Effect migration (#19042) 2026-03-25 20:15:05 -04:00
Kit Langton
a4dd023c79 Merge branch 'dev' into kit/effectify-lsp 2026-03-25 19:52:45 -04:00
Kit Langton
62f20f4986 fix(git): use CrossSpawnSpawner instead of NodeChildProcessSpawner 2026-03-25 15:50:01 -04:00
Kit Langton
e250a67001 Merge branch 'dev' into kit/effectify-lsp 2026-03-25 15:21:13 -04:00
Kit Langton
91378ec7db cleanup(lsp): extract callHierarchyRequest helper for incoming/outgoing calls 2026-03-25 14:04:31 -04:00
Kit Langton
85be6eaa7f test(lsp): add lifecycle tests for Effect service layer 2026-03-25 14:03:13 -04:00
Kit Langton
a265da7a8d fix(lsp): restore Instance.containsPath guard in getClients 2026-03-25 12:36:29 -04:00
Kit Langton
0c510b0e61 refactor(lsp): effectify LSP service with InstanceState
Migrate LSP from Instance.state() to Effect Service/Layer/InstanceState
pattern with makeRunPromise and async facades.

- InstanceState for per-directory lifecycle with ScopedCache
- Effect.fn tracing on all 14 service methods
- Effect.fnUntraced for internal helpers (getClients, runForFile, runForAll)
- addFinalizer for client shutdown on instance disposal
- No changes to server.ts, client.ts, launch.ts, or language.ts
- No consumer changes — all 15+ callers use unchanged async facades
2026-03-25 12:31:51 -04:00
59 changed files with 3447 additions and 2060 deletions

1
.github/VOUCHED.td vendored
View File

@@ -25,3 +25,4 @@ r44vc0rp
rekram1-node
-spider-yamet clawdbot/llm psychosis, spam pinging the team
thdxr
-OpenCodeEngineer bot that spams issues

View File

@@ -358,7 +358,7 @@
"drizzle-orm": "catalog:",
"effect": "catalog:",
"fuzzysort": "3.1.0",
"gitlab-ai-provider": "5.3.2",
"gitlab-ai-provider": "5.3.3",
"glob": "13.0.5",
"google-auth-library": "10.5.0",
"gray-matter": "4.0.3",
@@ -3037,7 +3037,7 @@
"github-slugger": ["github-slugger@2.0.0", "", {}, "sha512-IaOQ9puYtjrkq7Y0Ygl9KDZnrf/aiUJYUpVf89y8kyaxbRG7Y1SrX/jaumrv81vc61+kiMempujsM3Yw7w5qcw=="],
"gitlab-ai-provider": ["gitlab-ai-provider@5.3.2", "", { "dependencies": { "@anthropic-ai/sdk": "^0.71.0", "@anycable/core": "^0.9.2", "graphql-request": "^6.1.0", "isomorphic-ws": "^5.0.0", "openai": "^6.16.0", "socket.io-client": "^4.8.1", "vscode-jsonrpc": "^8.2.1", "zod": "^3.25.76" }, "peerDependencies": { "@ai-sdk/provider": ">=2.0.0", "@ai-sdk/provider-utils": ">=3.0.0" } }, "sha512-EiAipDMa4Ngsxp4MMaua5YHWsHhc9kGXKmBxulJg1Gueb+5IZmMwxaVtgWTGWZITxC3tzKEeRt/3U4McE2vTIA=="],
"gitlab-ai-provider": ["gitlab-ai-provider@5.3.3", "", { "dependencies": { "@anthropic-ai/sdk": "^0.71.0", "@anycable/core": "^0.9.2", "graphql-request": "^6.1.0", "isomorphic-ws": "^5.0.0", "openai": "^6.16.0", "socket.io-client": "^4.8.1", "vscode-jsonrpc": "^8.2.1", "zod": "^3.25.76" }, "peerDependencies": { "@ai-sdk/provider": ">=2.0.0", "@ai-sdk/provider-utils": ">=3.0.0" } }, "sha512-k0kRUoAhDvoRC28hQW4sPp+A3cfpT5c/oL9Ng10S0oBiF2Tci1AtsX1iclJM5Os8C1nIIAXBW8LMr0GY7rwcGA=="],
"glob": ["glob@13.0.5", "", { "dependencies": { "minimatch": "^10.2.1", "minipass": "^7.1.2", "path-scurry": "^2.0.0" } }, "sha512-BzXxZg24Ibra1pbQ/zE7Kys4Ua1ks7Bn6pKLkVPZ9FZe4JQS6/Q7ef3LG1H+k7lUf5l4T3PLSyYyYJVYUvfgTw=="],

View File

@@ -6,8 +6,7 @@ const serverHost = process.env.PLAYWRIGHT_SERVER_HOST ?? "127.0.0.1"
const serverPort = process.env.PLAYWRIGHT_SERVER_PORT ?? "4096"
const command = `bun run dev -- --host 0.0.0.0 --port ${port}`
const reuse = !process.env.CI
const workers =
Number(process.env.PLAYWRIGHT_WORKERS ?? (process.env.CI ? (process.platform === "win32" ? 2 : 5) : 0)) || undefined
const workers = Number(process.env.PLAYWRIGHT_WORKERS ?? (process.env.CI ? 5 : 0)) || undefined
export default defineConfig({
testDir: "./e2e",

View File

@@ -31,12 +31,14 @@ See `specs/effect-migration.md` for the compact pattern reference and examples.
- Use `Schema.Defect` instead of `unknown` for defect-like causes.
- In `Effect.gen` / `Effect.fn`, prefer `yield* new MyError(...)` over `yield* Effect.fail(new MyError(...))` for direct early-failure branches.
## Runtime vs Instances
## Runtime vs InstanceState
- Use the shared runtime for process-wide services with one lifecycle for the whole app.
- Use `src/effect/instances.ts` for per-directory or per-project services that need `InstanceContext`, per-instance state, or per-instance cleanup.
- If two open directories should not share one copy of the service, it belongs in `Instances`.
- Instance-scoped services should read context from `InstanceContext`, not `Instance.*` globals.
- Use `makeRuntime` (from `src/effect/run-service.ts`) for all services. It returns `{ runPromise, runFork, runCallback }` backed by a shared `memoMap` that deduplicates layers.
- Use `InstanceState` (from `src/effect/instance-state.ts`) for per-directory or per-project state that needs per-instance cleanup. It uses `ScopedCache` keyed by directory — each open project gets its own state, automatically cleaned up on disposal.
- If two open directories should not share one copy of the service, it needs `InstanceState`.
- Do the work directly in the `InstanceState.make` closure — `ScopedCache` handles run-once semantics. Don't add fibers, `ensure()` callbacks, or `started` flags on top.
- Use `Effect.addFinalizer` or `Effect.acquireRelease` inside the `InstanceState.make` closure for cleanup (subscriptions, process teardown, etc.).
- Use `Effect.forkScoped` inside the closure for background stream consumers — the fiber is interrupted when the instance is disposed.
## Preferred Effect services
@@ -47,15 +49,11 @@ See `specs/effect-migration.md` for the compact pattern reference and examples.
- Prefer `Path.Path`, `Config`, `Clock`, and `DateTime` when those concerns are already inside Effect code.
- For background loops or scheduled tasks, use `Effect.repeat` or `Effect.schedule` with `Effect.forkScoped` in the layer definition.
## Effect.cached for deduplication
Use `Effect.cached` when multiple concurrent callers should share a single in-flight computation rather than storing `Fiber | undefined` or `Promise | undefined` manually. See `specs/effect-migration.md` for the full pattern.
## Instance.bind — ALS for native callbacks
`Instance.bind(fn)` captures the current Instance AsyncLocalStorage context and restores it synchronously when called.
Use it for native addon callbacks (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish`, `Instance.state()`, or anything that reads `Instance.directory`.
Use it for native addon callbacks (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish` or anything that reads `Instance.directory`.
You do not need it for `setTimeout`, `Promise.then`, `EventEmitter.on`, or Effect fibers.

View File

@@ -121,7 +121,7 @@
"drizzle-orm": "catalog:",
"effect": "catalog:",
"fuzzysort": "3.1.0",
"gitlab-ai-provider": "5.3.2",
"gitlab-ai-provider": "5.3.3",
"glob": "13.0.5",
"google-auth-library": "10.5.0",
"gray-matter": "4.0.3",

View File

@@ -6,7 +6,7 @@ Practical reference for new and migrated Effect code in `packages/opencode`.
Use `InstanceState` (from `src/effect/instance-state.ts`) for services that need per-directory state, per-instance cleanup, or project-bound background work. InstanceState uses a `ScopedCache` keyed by directory, so each open project gets its own copy of the state that is automatically cleaned up on disposal.
Use `makeRunPromise` (from `src/effect/run-service.ts`) to create a per-service `ManagedRuntime` that lazily initializes and shares layers via a global `memoMap`.
Use `makeRuntime` (from `src/effect/run-service.ts`) to create a per-service `ManagedRuntime` that lazily initializes and shares layers via a global `memoMap`. Returns `{ runPromise, runFork, runCallback }`.
- Global services (no per-directory state): Account, Auth, Installation, Truncate
- Instance-scoped (per-directory state via InstanceState): File, FileTime, FileWatcher, Format, Permission, Question, Skill, Snapshot, Vcs, ProviderAuth
@@ -46,7 +46,7 @@ export namespace Foo {
export const defaultLayer = layer.pipe(Layer.provide(FooDep.layer))
// Per-service runtime (inside the namespace)
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
// Async facade functions
export async function get(id: FooID) {
@@ -79,22 +79,24 @@ See `Auth.ZodInfo` for the canonical example.
The `InstanceState.make` init callback receives a `Scope`, so you can use `Effect.acquireRelease`, `Effect.addFinalizer`, and `Effect.forkScoped` inside it. Resources acquired this way are automatically cleaned up when the instance is disposed or invalidated by `ScopedCache`. This makes it the right place for:
- **Subscriptions**: Use `Effect.acquireRelease` to subscribe and auto-unsubscribe:
- **Subscriptions**: Yield `Bus.Service` at the layer level, then use `Stream` + `forkScoped` inside the init closure. The fiber is automatically interrupted when the instance scope closes:
```ts
const bus = yield * Bus.Service
const cache =
yield *
InstanceState.make<State>(
Effect.fn("Foo.state")(function* (ctx) {
// ... load state ...
yield* Effect.acquireRelease(
Effect.sync(() =>
Bus.subscribeAll((event) => {
yield* bus.subscribeAll().pipe(
Stream.runForEach((event) =>
Effect.sync(() => {
/* handle */
}),
),
(unsub) => Effect.sync(unsub),
Effect.forkScoped,
)
return {
@@ -104,36 +106,21 @@ const cache =
)
```
- **Resource cleanup**: Use `Effect.acquireRelease` or `Effect.addFinalizer` for resources that need teardown (native watchers, process handles, etc.):
```ts
yield *
Effect.acquireRelease(
Effect.sync(() => nativeAddon.watch(dir)),
(watcher) => Effect.sync(() => watcher.close()),
)
```
- **Background fibers**: Use `Effect.forkScoped` — the fiber is interrupted on disposal.
- **Side effects at init**: Config notification, event wiring, etc. all belong in the init closure. Callers just do `InstanceState.get(cache)` to trigger everything, and `ScopedCache` deduplicates automatically.
The key insight: don't split init into a separate method with a `started` flag. Put everything in the `InstanceState.make` closure and let `ScopedCache` handle the run-once semantics.
## Effect.cached for deduplication
Use `Effect.cached` when multiple concurrent callers should share a single in-flight computation. It memoizes the result and deduplicates concurrent fibers — second caller joins the first caller's fiber instead of starting a new one.
```ts
// Inside the layer — yield* to initialize the memo
let cached = yield* Effect.cached(loadExpensive())
const get = Effect.fn("Foo.get")(function* () {
return yield* cached // concurrent callers share the same fiber
})
// To invalidate: swap in a fresh memo
const invalidate = Effect.fn("Foo.invalidate")(function* () {
cached = yield* Effect.cached(loadExpensive())
})
```
Prefer `Effect.cached` over these patterns:
- Storing a `Fiber.Fiber | undefined` with manual check-and-fork (e.g. `file/index.ts` `ensure`)
- Storing a `Promise<void>` task for deduplication (e.g. `skill/index.ts` `ensure`)
- `let cached: X | undefined` with check-and-load (races when two callers see `undefined` before either resolves)
`Effect.cached` handles the run-once + concurrent-join semantics automatically. For invalidatable caches, reassign with `yield* Effect.cached(...)` — the old memo is discarded.
## Scheduled Tasks
For loops or periodic work, use `Effect.repeat` or `Effect.schedule` with `Effect.forkScoped` in the layer definition.
@@ -190,14 +177,14 @@ Still open and likely worth migrating:
- [x] `ToolRegistry`
- [ ] `Pty`
- [x] `Worktree`
- [ ] `Bus`
- [x] `Bus`
- [x] `Command`
- [x] `Config`
- [ ] `Config`
- [ ] `Session`
- [ ] `SessionProcessor`
- [ ] `SessionPrompt`
- [ ] `SessionCompaction`
- [ ] `Provider`
- [x] `Project`
- [ ] `LSP`
- [ ] `MCP`
- [x] `LSP`
- [x] `MCP`

View File

@@ -1,7 +1,7 @@
import { Clock, Duration, Effect, Layer, Option, Schema, SchemaGetter, ServiceMap } from "effect"
import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { withTransientReadRetry } from "@/util/effect-http-client"
import { AccountRepo, type AccountRow } from "./repo"
import {
@@ -379,7 +379,7 @@ export namespace Account {
export const defaultLayer = layer.pipe(Layer.provide(AccountRepo.layer), Layer.provide(FetchHttpClient.layer))
export const runPromise = makeRunPromise(Service, defaultLayer)
export const { runPromise } = makeRuntime(Service, defaultLayer)
export async function active(): Promise<Info | undefined> {
return Option.getOrUndefined(await runPromise((service) => service.active()))

View File

@@ -21,7 +21,7 @@ import { Plugin } from "@/plugin"
import { Skill } from "../skill"
import { Effect, ServiceMap, Layer } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
export namespace Agent {
export const Info = z
@@ -393,7 +393,7 @@ export namespace Agent {
export const defaultLayer = layer.pipe(Layer.provide(Auth.layer))
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function get(agent: string) {
return runPromise((svc) => svc.get(agent))

View File

@@ -1,6 +1,6 @@
import path from "path"
import { Effect, Layer, Record, Result, Schema, ServiceMap } from "effect"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { zod } from "@/util/effect-zod"
import { Global } from "../global"
import { Filesystem } from "../util/filesystem"
@@ -95,7 +95,7 @@ export namespace Auth {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export async function get(providerID: string) {
return runPromise((service) => service.get(providerID))

View File

@@ -1,12 +1,14 @@
import z from "zod"
import { Effect, Exit, Layer, PubSub, Scope, ServiceMap, Stream } from "effect"
import { Log } from "../util/log"
import { Instance } from "../project/instance"
import { BusEvent } from "./bus-event"
import { GlobalBus } from "./global"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
export namespace Bus {
const log = Log.create({ service: "bus" })
type Subscription = (event: any) => void
export const InstanceDisposed = BusEvent.define(
"server.instance.disposed",
@@ -15,91 +17,168 @@ export namespace Bus {
}),
)
const state = Instance.state(
() => {
const subscriptions = new Map<any, Subscription[]>()
type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
type: D["type"]
properties: z.infer<D["properties"]>
}
return {
subscriptions,
type State = {
wildcard: PubSub.PubSub<Payload>
typed: Map<string, PubSub.PubSub<Payload>>
}
export interface Interface {
readonly publish: <D extends BusEvent.Definition>(
def: D,
properties: z.output<D["properties"]>,
) => Effect.Effect<void>
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
readonly subscribeAll: () => Stream.Stream<Payload>
readonly subscribeCallback: <D extends BusEvent.Definition>(
def: D,
callback: (event: Payload<D>) => unknown,
) => Effect.Effect<() => void>
readonly subscribeAllCallback: (callback: (event: any) => unknown) => Effect.Effect<() => void>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Bus") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const cache = yield* InstanceState.make<State>(
Effect.fn("Bus.state")(function* (ctx) {
const wildcard = yield* PubSub.unbounded<Payload>()
const typed = new Map<string, PubSub.PubSub<Payload>>()
yield* Effect.addFinalizer(() =>
Effect.gen(function* () {
// Publish InstanceDisposed before shutting down so subscribers see it
yield* PubSub.publish(wildcard, {
type: InstanceDisposed.type,
properties: { directory: ctx.directory },
})
yield* PubSub.shutdown(wildcard)
for (const ps of typed.values()) {
yield* PubSub.shutdown(ps)
}
}),
)
return { wildcard, typed }
}),
)
function getOrCreate<D extends BusEvent.Definition>(state: State, def: D) {
return Effect.gen(function* () {
let ps = state.typed.get(def.type)
if (!ps) {
ps = yield* PubSub.unbounded<Payload>()
state.typed.set(def.type, ps)
}
return ps as unknown as PubSub.PubSub<Payload<D>>
})
}
},
async (entry) => {
const wildcard = entry.subscriptions.get("*")
if (!wildcard) return
const event = {
type: InstanceDisposed.type,
properties: {
directory: Instance.directory,
},
function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
return Effect.gen(function* () {
const state = yield* InstanceState.get(cache)
const payload: Payload = { type: def.type, properties }
log.info("publishing", { type: def.type })
const ps = state.typed.get(def.type)
if (ps) yield* PubSub.publish(ps, payload)
yield* PubSub.publish(state.wildcard, payload)
GlobalBus.emit("event", {
directory: Instance.directory,
payload,
})
})
}
for (const sub of [...wildcard]) {
sub(event)
function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
log.info("subscribing", { type: def.type })
return Stream.unwrap(
Effect.gen(function* () {
const state = yield* InstanceState.get(cache)
const ps = yield* getOrCreate(state, def)
return Stream.fromPubSub(ps)
}),
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
}
},
function subscribeAll(): Stream.Stream<Payload> {
log.info("subscribing", { type: "*" })
return Stream.unwrap(
Effect.gen(function* () {
const state = yield* InstanceState.get(cache)
return Stream.fromPubSub(state.wildcard)
}),
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))))
}
function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
return Effect.gen(function* () {
log.info("subscribing", { type })
const scope = yield* Scope.make()
const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
yield* Scope.provide(scope)(
Stream.fromSubscription(subscription).pipe(
Stream.runForEach((msg) =>
Effect.tryPromise({
try: () => Promise.resolve().then(() => callback(msg)),
catch: (cause) => {
log.error("subscriber failed", { type, cause })
},
}).pipe(Effect.ignore),
),
Effect.forkScoped,
),
)
return () => {
log.info("unsubscribing", { type })
Effect.runFork(Scope.close(scope, Exit.void))
}
})
}
const subscribeCallback = Effect.fn("Bus.subscribeCallback")(function* <D extends BusEvent.Definition>(
def: D,
callback: (event: Payload<D>) => unknown,
) {
const state = yield* InstanceState.get(cache)
const ps = yield* getOrCreate(state, def)
return yield* on(ps, def.type, callback)
})
const subscribeAllCallback = Effect.fn("Bus.subscribeAllCallback")(function* (callback: (event: any) => unknown) {
const state = yield* InstanceState.get(cache)
return yield* on(state.wildcard, "*", callback)
})
return Service.of({ publish, subscribe, subscribeAll, subscribeCallback, subscribeAllCallback })
}),
)
export async function publish<Definition extends BusEvent.Definition>(
def: Definition,
properties: z.output<Definition["properties"]>,
const { runPromise, runSync } = makeRuntime(Service, layer)
// runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe,
// Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw.
export async function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
return runPromise((svc) => svc.publish(def, properties))
}
export function subscribe<D extends BusEvent.Definition>(
def: D,
callback: (event: { type: D["type"]; properties: z.infer<D["properties"]> }) => unknown,
) {
const payload = {
type: def.type,
properties,
}
log.info("publishing", {
type: def.type,
})
const pending = []
for (const key of [def.type, "*"]) {
const match = [...(state().subscriptions.get(key) ?? [])]
for (const sub of match) {
pending.push(sub(payload))
}
}
GlobalBus.emit("event", {
directory: Instance.directory,
payload,
})
return Promise.all(pending)
return runSync((svc) => svc.subscribeCallback(def, callback))
}
export function subscribe<Definition extends BusEvent.Definition>(
def: Definition,
callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
) {
return raw(def.type, callback)
}
export function once<Definition extends BusEvent.Definition>(
def: Definition,
callback: (event: {
type: Definition["type"]
properties: z.infer<Definition["properties"]>
}) => "done" | undefined,
) {
const unsub = subscribe(def, (event) => {
if (callback(event)) unsub()
})
}
export function subscribeAll(callback: (event: any) => void) {
return raw("*", callback)
}
function raw(type: string, callback: (event: any) => void) {
log.info("subscribing", { type })
const subscriptions = state().subscriptions
let match = subscriptions.get(type) ?? []
match.push(callback)
subscriptions.set(type, match)
return () => {
log.info("unsubscribing", { type })
const match = subscriptions.get(type)
if (!match) return
const index = match.indexOf(callback)
if (index === -1) return
match.splice(index, 1)
}
export function subscribeAll(callback: (event: any) => unknown) {
return runSync((svc) => svc.subscribeAllCallback(callback))
}
}

View File

@@ -137,7 +137,8 @@ export const rpc = {
})
},
async reload() {
await Config.invalidate(true)
Config.global.reset()
await Instance.disposeAll()
},
async setWorkspace(input: { workspaceID?: string }) {
startEventStream({ directory: process.cwd(), workspaceID: input.workspaceID })

View File

@@ -37,7 +37,7 @@ export function withNetworkOptions<T>(yargs: Argv<T>) {
}
export async function resolveNetworkOptions(args: NetworkOptions) {
const config = await Config.getGlobal()
const config = await Config.global()
const portExplicitlySet = process.argv.includes("--port")
const hostnameExplicitlySet = process.argv.includes("--hostname")
const mdnsExplicitlySet = process.argv.includes("--mdns")

View File

@@ -4,7 +4,7 @@ import { Flag } from "@/flag/flag"
import { Installation } from "@/installation"
export async function upgrade() {
const config = await Config.getGlobal()
const config = await Config.global()
const method = await Installation.method()
const latest = await Installation.latest(method).catch(() => {})
if (!latest) return

View File

@@ -1,6 +1,6 @@
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { SessionID, MessageID } from "@/session/schema"
import { Effect, Layer, ServiceMap } from "effect"
import z from "zod"
@@ -173,7 +173,7 @@ export namespace Command {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export async function get(name: string) {
return runPromise((svc) => svc.get(name))

View File

@@ -1,13 +1,14 @@
import { Log } from "../util/log"
import path from "path"
import { pathToFileURL } from "url"
import { pathToFileURL, fileURLToPath } from "url"
import { createRequire } from "module"
import os from "os"
import z from "zod"
import { ModelsDev } from "../provider/models"
import { mergeDeep, pipe, unique } from "remeda"
import { Global } from "../global"
import fsNode from "fs/promises"
import fs from "fs/promises"
import { lazy } from "../util/lazy"
import { NamedError } from "@opencode-ai/util/error"
import { Flag } from "../flag/flag"
import { Auth } from "../auth"
@@ -19,7 +20,7 @@ import {
parse as parseJsonc,
printParseErrorCode,
} from "jsonc-parser"
import { Instance, type InstanceContext } from "../project/instance"
import { Instance } from "../project/instance"
import { LSPServer } from "../lsp/server"
import { BunProc } from "@/bun"
import { Installation } from "@/installation"
@@ -37,10 +38,6 @@ import { ConfigPaths } from "./paths"
import { Filesystem } from "@/util/filesystem"
import { Process } from "@/util/process"
import { Lock } from "@/util/lock"
import { AppFileSystem } from "@/filesystem"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { Effect, Layer, ServiceMap } from "effect"
export namespace Config {
const ModelId = z.string().meta({ $ref: "https://models.dev/model-schema.json#/$defs/Model" })
@@ -78,6 +75,201 @@ export namespace Config {
return merged
}
export const state = Instance.state(async () => {
const auth = await Auth.all()
// Config loading order (low -> high precedence): https://opencode.ai/docs/config#precedence-order
// 1) Remote .well-known/opencode (org defaults)
// 2) Global config (~/.config/opencode/opencode.json{,c})
// 3) Custom config (OPENCODE_CONFIG)
// 4) Project config (opencode.json{,c})
// 5) .opencode directories (.opencode/agents/, .opencode/commands/, .opencode/plugins/, .opencode/opencode.json{,c})
// 6) Inline config (OPENCODE_CONFIG_CONTENT)
// Managed config directory is enterprise-only and always overrides everything above.
let result: Info = {}
for (const [key, value] of Object.entries(auth)) {
if (value.type === "wellknown") {
const url = key.replace(/\/+$/, "")
process.env[value.key] = value.token
log.debug("fetching remote config", { url: `${url}/.well-known/opencode` })
const response = await fetch(`${url}/.well-known/opencode`)
if (!response.ok) {
throw new Error(`failed to fetch remote config from ${url}: ${response.status}`)
}
const wellknown = (await response.json()) as any
const remoteConfig = wellknown.config ?? {}
// Add $schema to prevent load() from trying to write back to a non-existent file
if (!remoteConfig.$schema) remoteConfig.$schema = "https://opencode.ai/config.json"
result = mergeConfigConcatArrays(
result,
await load(JSON.stringify(remoteConfig), {
dir: path.dirname(`${url}/.well-known/opencode`),
source: `${url}/.well-known/opencode`,
}),
)
log.debug("loaded remote config from well-known", { url })
}
}
// Global user config overrides remote config.
result = mergeConfigConcatArrays(result, await global())
// Custom config path overrides global config.
if (Flag.OPENCODE_CONFIG) {
result = mergeConfigConcatArrays(result, await loadFile(Flag.OPENCODE_CONFIG))
log.debug("loaded custom config", { path: Flag.OPENCODE_CONFIG })
}
// Project config overrides global and remote config.
if (!Flag.OPENCODE_DISABLE_PROJECT_CONFIG) {
for (const file of await ConfigPaths.projectFiles("opencode", Instance.directory, Instance.worktree)) {
result = mergeConfigConcatArrays(result, await loadFile(file))
}
}
result.agent = result.agent || {}
result.mode = result.mode || {}
result.plugin = result.plugin || []
const directories = await ConfigPaths.directories(Instance.directory, Instance.worktree)
// .opencode directory config overrides (project and global) config sources.
if (Flag.OPENCODE_CONFIG_DIR) {
log.debug("loading config from OPENCODE_CONFIG_DIR", { path: Flag.OPENCODE_CONFIG_DIR })
}
const deps = []
for (const dir of unique(directories)) {
if (dir.endsWith(".opencode") || dir === Flag.OPENCODE_CONFIG_DIR) {
for (const file of ["opencode.jsonc", "opencode.json"]) {
log.debug(`loading config from ${path.join(dir, file)}`)
result = mergeConfigConcatArrays(result, await loadFile(path.join(dir, file)))
// to satisfy the type checker
result.agent ??= {}
result.mode ??= {}
result.plugin ??= []
}
}
deps.push(
iife(async () => {
const shouldInstall = await needsInstall(dir)
if (shouldInstall) await installDependencies(dir)
}),
)
result.command = mergeDeep(result.command ?? {}, await loadCommand(dir))
result.agent = mergeDeep(result.agent, await loadAgent(dir))
result.agent = mergeDeep(result.agent, await loadMode(dir))
result.plugin.push(...(await loadPlugin(dir)))
}
// Inline config content overrides all non-managed config sources.
if (process.env.OPENCODE_CONFIG_CONTENT) {
result = mergeConfigConcatArrays(
result,
await load(process.env.OPENCODE_CONFIG_CONTENT, {
dir: Instance.directory,
source: "OPENCODE_CONFIG_CONTENT",
}),
)
log.debug("loaded custom config from OPENCODE_CONFIG_CONTENT")
}
const active = await Account.active()
if (active?.active_org_id) {
try {
const [config, token] = await Promise.all([
Account.config(active.id, active.active_org_id),
Account.token(active.id),
])
if (token) {
process.env["OPENCODE_CONSOLE_TOKEN"] = token
Env.set("OPENCODE_CONSOLE_TOKEN", token)
}
if (config) {
result = mergeConfigConcatArrays(
result,
await load(JSON.stringify(config), {
dir: path.dirname(`${active.url}/api/config`),
source: `${active.url}/api/config`,
}),
)
}
} catch (err: any) {
log.debug("failed to fetch remote account config", { error: err?.message ?? err })
}
}
// Load managed config files last (highest priority) - enterprise admin-controlled
// Kept separate from directories array to avoid write operations when installing plugins
// which would fail on system directories requiring elevated permissions
// This way it only loads config file and not skills/plugins/commands
if (existsSync(managedDir)) {
for (const file of ["opencode.jsonc", "opencode.json"]) {
result = mergeConfigConcatArrays(result, await loadFile(path.join(managedDir, file)))
}
}
// Migrate deprecated mode field to agent field
for (const [name, mode] of Object.entries(result.mode ?? {})) {
result.agent = mergeDeep(result.agent ?? {}, {
[name]: {
...mode,
mode: "primary" as const,
},
})
}
if (Flag.OPENCODE_PERMISSION) {
result.permission = mergeDeep(result.permission ?? {}, JSON.parse(Flag.OPENCODE_PERMISSION))
}
// Backwards compatibility: legacy top-level `tools` config
if (result.tools) {
const perms: Record<string, Config.PermissionAction> = {}
for (const [tool, enabled] of Object.entries(result.tools)) {
const action: Config.PermissionAction = enabled ? "allow" : "deny"
if (tool === "write" || tool === "edit" || tool === "patch" || tool === "multiedit") {
perms.edit = action
continue
}
perms[tool] = action
}
result.permission = mergeDeep(perms, result.permission ?? {})
}
if (!result.username) result.username = os.userInfo().username
// Handle migration from autoshare to share field
if (result.autoshare === true && !result.share) {
result.share = "auto"
}
// Apply flag overrides for compaction settings
if (Flag.OPENCODE_DISABLE_AUTOCOMPACT) {
result.compaction = { ...result.compaction, auto: false }
}
if (Flag.OPENCODE_DISABLE_PRUNE) {
result.compaction = { ...result.compaction, prune: false }
}
result.plugin = deduplicatePlugins(result.plugin ?? [])
return {
config: result,
directories,
deps,
}
})
export async function waitForDependencies() {
const deps = await state().then((x) => x.deps)
await Promise.all(deps)
}
export async function installDependencies(dir: string) {
const pkg = path.join(dir, "package.json")
const targetVersion = Installation.isLocal() ? "*" : Installation.VERSION
@@ -133,7 +325,7 @@ export namespace Config {
async function isWritable(dir: string) {
try {
await fsNode.access(dir, constants.W_OK)
await fs.access(dir, constants.W_OK)
return true
} catch {
return false
@@ -1042,23 +1234,123 @@ export namespace Config {
export type Info = z.output<typeof Info>
type State = {
config: Info
directories: string[]
deps: Promise<void>[]
export const global = lazy(async () => {
let result: Info = pipe(
{},
mergeDeep(await loadFile(path.join(Global.Path.config, "config.json"))),
mergeDeep(await loadFile(path.join(Global.Path.config, "opencode.json"))),
mergeDeep(await loadFile(path.join(Global.Path.config, "opencode.jsonc"))),
)
const legacy = path.join(Global.Path.config, "config")
if (existsSync(legacy)) {
await import(pathToFileURL(legacy).href, {
with: {
type: "toml",
},
})
.then(async (mod) => {
const { provider, model, ...rest } = mod.default
if (provider && model) result.model = `${provider}/${model}`
result["$schema"] = "https://opencode.ai/config.json"
result = mergeDeep(result, rest)
await Filesystem.writeJson(path.join(Global.Path.config, "config.json"), result)
await fs.unlink(legacy)
})
.catch(() => {})
}
return result
})
export const { readFile } = ConfigPaths
async function loadFile(filepath: string): Promise<Info> {
log.info("loading", { path: filepath })
const text = await readFile(filepath)
if (!text) return {}
return load(text, { path: filepath })
}
export interface Interface {
readonly get: () => Effect.Effect<Info>
readonly getGlobal: () => Effect.Effect<Info>
readonly update: (config: Info) => Effect.Effect<void>
readonly updateGlobal: (config: Info) => Effect.Effect<Info>
readonly invalidate: (wait?: boolean) => Effect.Effect<void>
readonly directories: () => Effect.Effect<string[]>
readonly waitForDependencies: () => Effect.Effect<void>
async function load(text: string, options: { path: string } | { dir: string; source: string }) {
const original = text
const source = "path" in options ? options.path : options.source
const isFile = "path" in options
const data = await ConfigPaths.parseText(
text,
"path" in options ? options.path : { source: options.source, dir: options.dir },
)
const normalized = (() => {
if (!data || typeof data !== "object" || Array.isArray(data)) return data
const copy = { ...(data as Record<string, unknown>) }
const hadLegacy = "theme" in copy || "keybinds" in copy || "tui" in copy
if (!hadLegacy) return copy
delete copy.theme
delete copy.keybinds
delete copy.tui
log.warn("tui keys in opencode config are deprecated; move them to tui.json", { path: source })
return copy
})()
const parsed = Info.safeParse(normalized)
if (parsed.success) {
if (!parsed.data.$schema && isFile) {
parsed.data.$schema = "https://opencode.ai/config.json"
const updated = original.replace(/^\s*\{/, '{\n "$schema": "https://opencode.ai/config.json",')
await Filesystem.write(options.path, updated).catch(() => {})
}
const data = parsed.data
if (data.plugin && isFile) {
for (let i = 0; i < data.plugin.length; i++) {
const plugin = data.plugin[i]
try {
data.plugin[i] = import.meta.resolve!(plugin, options.path)
} catch (e) {
try {
// import.meta.resolve sometimes fails with newly created node_modules
const require = createRequire(options.path)
const resolvedPath = require.resolve(plugin)
data.plugin[i] = pathToFileURL(resolvedPath).href
} catch {
// Ignore, plugin might be a generic string identifier like "mcp-server"
}
}
}
}
return data
}
throw new InvalidError({
path: source,
issues: parsed.error.issues,
})
}
export const { JsonError, InvalidError } = ConfigPaths
export const ConfigDirectoryTypoError = NamedError.create(
"ConfigDirectoryTypoError",
z.object({
path: z.string(),
dir: z.string(),
suggestion: z.string(),
}),
)
export async function get() {
return state().then((x) => x.config)
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Config") {}
export async function getGlobal() {
return global()
}
export async function update(config: Info) {
const filepath = path.join(Instance.directory, "config.json")
const existing = await loadFile(filepath)
await Filesystem.writeJson(filepath, mergeDeep(existing, config))
await Instance.dispose()
}
function globalConfigFile() {
const candidates = ["opencode.jsonc", "opencode.json", "config.json"].map((file) =>
@@ -1125,413 +1417,47 @@ export namespace Config {
})
}
export const { JsonError, InvalidError } = ConfigPaths
export async function updateGlobal(config: Info) {
const filepath = globalConfigFile()
const before = await Filesystem.readText(filepath).catch((err: any) => {
if (err.code === "ENOENT") return "{}"
throw new JsonError({ path: filepath }, { cause: err })
})
export const ConfigDirectoryTypoError = NamedError.create(
"ConfigDirectoryTypoError",
z.object({
path: z.string(),
dir: z.string(),
suggestion: z.string(),
}),
)
const next = await (async () => {
if (!filepath.endsWith(".jsonc")) {
const existing = parseConfig(before, filepath)
const merged = mergeDeep(existing, config)
await Filesystem.writeJson(filepath, merged)
return merged
}
export const layer: Layer.Layer<Service, never, AppFileSystem.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const updated = patchJsonc(before, config)
const merged = parseConfig(updated, filepath)
await Filesystem.write(filepath, updated)
return merged
})()
const readConfigFile = Effect.fnUntraced(function* (filepath: string) {
return yield* fs.readFileString(filepath).pipe(
Effect.catchIf(
(e) => e.reason._tag === "NotFound",
() => Effect.succeed(undefined),
),
Effect.orDie,
)
})
global.reset()
const loadConfig = Effect.fnUntraced(function* (
text: string,
options: { path: string } | { dir: string; source: string },
) {
const original = text
const source = "path" in options ? options.path : options.source
const isFile = "path" in options
const data = yield* Effect.promise(() =>
ConfigPaths.parseText(text, "path" in options ? options.path : { source: options.source, dir: options.dir }),
)
const normalized = (() => {
if (!data || typeof data !== "object" || Array.isArray(data)) return data
const copy = { ...(data as Record<string, unknown>) }
const hadLegacy = "theme" in copy || "keybinds" in copy || "tui" in copy
if (!hadLegacy) return copy
delete copy.theme
delete copy.keybinds
delete copy.tui
log.warn("tui keys in opencode config are deprecated; move them to tui.json", { path: source })
return copy
})()
const parsed = Info.safeParse(normalized)
if (parsed.success) {
if (!parsed.data.$schema && isFile) {
parsed.data.$schema = "https://opencode.ai/config.json"
const updated = original.replace(/^\s*\{/, '{\n "$schema": "https://opencode.ai/config.json",')
yield* fs.writeFileString(options.path, updated).pipe(Effect.catch(() => Effect.void))
}
const data = parsed.data
if (data.plugin && isFile) {
for (let i = 0; i < data.plugin.length; i++) {
const plugin = data.plugin[i]
try {
data.plugin[i] = import.meta.resolve!(plugin, options.path)
} catch (e) {
try {
const require = createRequire(options.path)
const resolvedPath = require.resolve(plugin)
data.plugin[i] = pathToFileURL(resolvedPath).href
} catch {
// Ignore, plugin might be a generic string identifier like "mcp-server"
}
}
}
}
return data
}
throw new InvalidError({
path: source,
issues: parsed.error.issues,
void Instance.disposeAll()
.catch(() => undefined)
.finally(() => {
GlobalBus.emit("event", {
directory: "global",
payload: {
type: Event.Disposed.type,
properties: {},
},
})
})
const loadFile = Effect.fnUntraced(function* (filepath: string) {
log.info("loading", { path: filepath })
const text = yield* readConfigFile(filepath)
if (!text) return {} as Info
return yield* loadConfig(text, { path: filepath })
})
const loadGlobal = Effect.fnUntraced(function* () {
let result: Info = pipe(
{},
mergeDeep(yield* loadFile(path.join(Global.Path.config, "config.json"))),
mergeDeep(yield* loadFile(path.join(Global.Path.config, "opencode.json"))),
mergeDeep(yield* loadFile(path.join(Global.Path.config, "opencode.jsonc"))),
)
const legacy = path.join(Global.Path.config, "config")
if (existsSync(legacy)) {
yield* Effect.promise(() =>
import(pathToFileURL(legacy).href, { with: { type: "toml" } })
.then(async (mod) => {
const { provider, model, ...rest } = mod.default
if (provider && model) result.model = `${provider}/${model}`
result["$schema"] = "https://opencode.ai/config.json"
result = mergeDeep(result, rest)
await fsNode.writeFile(
path.join(Global.Path.config, "config.json"),
JSON.stringify(result, null, 2),
)
await fsNode.unlink(legacy)
})
.catch(() => {}),
)
}
return result
})
let cachedGlobal = yield* Effect.cached(
loadGlobal().pipe(Effect.orElseSucceed(() => ({}) as Info)),
)
const getGlobal = Effect.fn("Config.getGlobal")(function* () {
return yield* cachedGlobal
})
const loadInstanceState = Effect.fnUntraced(function* (ctx: InstanceContext) {
const auth = yield* Effect.promise(() => Auth.all())
let result: Info = {}
for (const [key, value] of Object.entries(auth)) {
if (value.type === "wellknown") {
const url = key.replace(/\/+$/, "")
process.env[value.key] = value.token
log.debug("fetching remote config", { url: `${url}/.well-known/opencode` })
const response = yield* Effect.promise(() => fetch(`${url}/.well-known/opencode`))
if (!response.ok) {
throw new Error(`failed to fetch remote config from ${url}: ${response.status}`)
}
const wellknown = (yield* Effect.promise(() => response.json())) as any
const remoteConfig = wellknown.config ?? {}
if (!remoteConfig.$schema) remoteConfig.$schema = "https://opencode.ai/config.json"
result = mergeConfigConcatArrays(
result,
yield* loadConfig(JSON.stringify(remoteConfig), {
dir: path.dirname(`${url}/.well-known/opencode`),
source: `${url}/.well-known/opencode`,
}),
)
log.debug("loaded remote config from well-known", { url })
}
}
result = mergeConfigConcatArrays(result, yield* getGlobal())
if (Flag.OPENCODE_CONFIG) {
result = mergeConfigConcatArrays(result, yield* loadFile(Flag.OPENCODE_CONFIG))
log.debug("loaded custom config", { path: Flag.OPENCODE_CONFIG })
}
if (!Flag.OPENCODE_DISABLE_PROJECT_CONFIG) {
for (const file of yield* Effect.promise(() =>
ConfigPaths.projectFiles("opencode", ctx.directory, ctx.worktree),
)) {
result = mergeConfigConcatArrays(result, yield* loadFile(file))
}
}
result.agent = result.agent || {}
result.mode = result.mode || {}
result.plugin = result.plugin || []
const directories = yield* Effect.promise(() => ConfigPaths.directories(ctx.directory, ctx.worktree))
if (Flag.OPENCODE_CONFIG_DIR) {
log.debug("loading config from OPENCODE_CONFIG_DIR", { path: Flag.OPENCODE_CONFIG_DIR })
}
const deps: Promise<void>[] = []
for (const dir of unique(directories)) {
if (dir.endsWith(".opencode") || dir === Flag.OPENCODE_CONFIG_DIR) {
for (const file of ["opencode.jsonc", "opencode.json"]) {
log.debug(`loading config from ${path.join(dir, file)}`)
result = mergeConfigConcatArrays(result, yield* loadFile(path.join(dir, file)))
result.agent ??= {}
result.mode ??= {}
result.plugin ??= []
}
}
deps.push(
iife(async () => {
const shouldInstall = await needsInstall(dir)
if (shouldInstall) await installDependencies(dir)
}),
)
result.command = mergeDeep(result.command ?? {}, yield* Effect.promise(() => loadCommand(dir)))
result.agent = mergeDeep(result.agent, yield* Effect.promise(() => loadAgent(dir)))
result.agent = mergeDeep(result.agent, yield* Effect.promise(() => loadMode(dir)))
result.plugin.push(...(yield* Effect.promise(() => loadPlugin(dir))))
}
if (process.env.OPENCODE_CONFIG_CONTENT) {
result = mergeConfigConcatArrays(
result,
yield* loadConfig(process.env.OPENCODE_CONFIG_CONTENT, {
dir: ctx.directory,
source: "OPENCODE_CONFIG_CONTENT",
}),
)
log.debug("loaded custom config from OPENCODE_CONFIG_CONTENT")
}
const active = yield* Effect.promise(() => Account.active())
if (active?.active_org_id) {
yield* Effect.gen(function* () {
const [config, token] = yield* Effect.promise(() =>
Promise.all([Account.config(active.id, active.active_org_id!), Account.token(active.id)]),
)
if (token) {
process.env["OPENCODE_CONSOLE_TOKEN"] = token
Env.set("OPENCODE_CONSOLE_TOKEN", token)
}
if (config) {
result = mergeConfigConcatArrays(
result,
yield* loadConfig(JSON.stringify(config), {
dir: path.dirname(`${active.url}/api/config`),
source: `${active.url}/api/config`,
}),
)
}
}).pipe(
Effect.catchDefect((err) => {
log.debug("failed to fetch remote account config", {
error: err instanceof Error ? err.message : String(err),
})
return Effect.void
}),
)
}
if (existsSync(managedDir)) {
for (const file of ["opencode.jsonc", "opencode.json"]) {
result = mergeConfigConcatArrays(result, yield* loadFile(path.join(managedDir, file)))
}
}
for (const [name, mode] of Object.entries(result.mode ?? {})) {
result.agent = mergeDeep(result.agent ?? {}, {
[name]: {
...mode,
mode: "primary" as const,
},
})
}
if (Flag.OPENCODE_PERMISSION) {
result.permission = mergeDeep(result.permission ?? {}, JSON.parse(Flag.OPENCODE_PERMISSION))
}
if (result.tools) {
const perms: Record<string, Config.PermissionAction> = {}
for (const [tool, enabled] of Object.entries(result.tools)) {
const action: Config.PermissionAction = enabled ? "allow" : "deny"
if (tool === "write" || tool === "edit" || tool === "patch" || tool === "multiedit") {
perms.edit = action
continue
}
perms[tool] = action
}
result.permission = mergeDeep(perms, result.permission ?? {})
}
if (!result.username) result.username = os.userInfo().username
if (result.autoshare === true && !result.share) {
result.share = "auto"
}
if (Flag.OPENCODE_DISABLE_AUTOCOMPACT) {
result.compaction = { ...result.compaction, auto: false }
}
if (Flag.OPENCODE_DISABLE_PRUNE) {
result.compaction = { ...result.compaction, prune: false }
}
result.plugin = deduplicatePlugins(result.plugin ?? [])
return {
config: result,
directories,
deps,
}
})
const state = yield* InstanceState.make<State>(
Effect.fn("Config.state")(function* (ctx) {
return yield* loadInstanceState(ctx)
}),
)
const get = Effect.fn("Config.get")(function* () {
return yield* InstanceState.use(state, (s) => s.config)
})
const directories = Effect.fn("Config.directories")(function* () {
return yield* InstanceState.use(state, (s) => s.directories)
})
const waitForDependencies = Effect.fn("Config.waitForDependencies")(function* () {
yield* InstanceState.useEffect(state, (s) =>
Effect.promise(() => Promise.all(s.deps).then(() => undefined)),
)
})
const update = Effect.fn("Config.update")(function* (config: Info) {
const file = path.join(Instance.directory, "config.json")
const existing = yield* loadFile(file)
yield* fs.writeFileString(file, JSON.stringify(mergeDeep(existing, config), null, 2)).pipe(Effect.orDie)
yield* Effect.promise(() => Instance.dispose())
})
const invalidate = Effect.fn("Config.invalidate")(function* (wait?: boolean) {
cachedGlobal = yield* Effect.cached(
loadGlobal().pipe(Effect.orElseSucceed(() => ({}) as Info)),
)
const task = Instance.disposeAll()
.catch(() => undefined)
.finally(() =>
GlobalBus.emit("event", {
directory: "global",
payload: {
type: Event.Disposed.type,
properties: {},
},
}),
)
if (wait) yield* Effect.promise(() => task)
else void task
})
const updateGlobal = Effect.fn("Config.updateGlobal")(function* (config: Info) {
const file = globalConfigFile()
const before = (yield* readConfigFile(file)) ?? "{}"
let next: Info
if (!file.endsWith(".jsonc")) {
const existing = parseConfig(before, file)
const merged = mergeDeep(existing, config)
yield* fs.writeFileString(file, JSON.stringify(merged, null, 2)).pipe(Effect.orDie)
next = merged
} else {
const updated = patchJsonc(before, config)
next = parseConfig(updated, file)
yield* fs.writeFileString(file, updated).pipe(Effect.orDie)
}
yield* invalidate()
return next
})
return Service.of({
get,
getGlobal,
update,
updateGlobal,
invalidate,
directories,
waitForDependencies,
})
}),
)
export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer))
const runPromise = makeRunPromise(Service, defaultLayer)
export async function get() {
return runPromise((svc) => svc.get())
}
export async function getGlobal() {
return runPromise((svc) => svc.getGlobal())
}
export async function update(config: Info) {
return runPromise((svc) => svc.update(config))
}
export async function updateGlobal(config: Info) {
return runPromise((svc) => svc.updateGlobal(config))
}
export async function invalidate(wait = false) {
return runPromise((svc) => svc.invalidate(wait))
return next
}
export async function directories() {
return runPromise((svc) => svc.directories())
}
export async function waitForDependencies() {
return runPromise((svc) => svc.waitForDependencies())
return state().then((x) => x.directories)
}
}
Filesystem.write
Filesystem.write

View File

@@ -1,5 +1,5 @@
import { Effect, ScopedCache, Scope } from "effect"
import { Instance, type InstanceContext } from "@/project/instance"
import { Instance, type Shape } from "@/project/instance"
import { registerDisposer } from "./instance-registry"
const TypeId = "~opencode/InstanceState"
@@ -11,7 +11,7 @@ export interface InstanceState<A, E = never, R = never> {
export namespace InstanceState {
export const make = <A, E = never, R = never>(
init: (ctx: InstanceContext) => Effect.Effect<A, E, R | Scope.Scope>,
init: (ctx: Shape) => Effect.Effect<A, E, R | Scope.Scope>,
): Effect.Effect<InstanceState<A, E, Exclude<R, Scope.Scope>>, never, R | Scope.Scope> =>
Effect.gen(function* () {
const cache = yield* ScopedCache.make<string, A, E, R>({

View File

@@ -3,11 +3,15 @@ import * as ServiceMap from "effect/ServiceMap"
export const memoMap = Layer.makeMemoMapUnsafe()
export function makeRunPromise<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap }))
return <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) => {
rt ??= ManagedRuntime.make(layer, { memoMap })
return rt.runPromise(service.use(fn), options)
return {
runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(service.use(fn)),
runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
getRuntime().runPromise(service.use(fn), options),
runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)),
runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runCallback(service.use(fn)),
}
}

View File

@@ -1,8 +1,8 @@
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { Git } from "@/git"
import { Effect, Fiber, Layer, Scope, ServiceMap } from "effect"
import { Effect, Layer, ServiceMap } from "effect"
import { formatPatch, structuredPatch } from "diff"
import fs from "fs"
import fuzzysort from "fuzzysort"
@@ -323,7 +323,6 @@ export namespace File {
interface State {
cache: Entry
fiber: Fiber.Fiber<void> | undefined
}
export interface Interface {
@@ -348,7 +347,6 @@ export namespace File {
Effect.fn("File.state")(() =>
Effect.succeed({
cache: { files: [], dirs: [] } as Entry,
fiber: undefined as Fiber.Fiber<void> | undefined,
}),
),
)
@@ -406,21 +404,11 @@ export namespace File {
s.cache = next
})
const scope = yield* Scope.Scope
let cachedScan = yield* Effect.cached(scan().pipe(Effect.catchCause(() => Effect.void)))
const ensure = Effect.fn("File.ensure")(function* () {
const s = yield* InstanceState.get(state)
if (!s.fiber)
s.fiber = yield* scan().pipe(
Effect.catchCause(() => Effect.void),
Effect.ensuring(
Effect.sync(() => {
s.fiber = undefined
}),
),
Effect.forkIn(scope),
)
yield* Fiber.join(s.fiber)
yield* cachedScan
cachedScan = yield* Effect.cached(scan().pipe(Effect.catchCause(() => Effect.void)))
})
const init = Effect.fn("File.init")(function* () {
@@ -688,7 +676,7 @@ export namespace File {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export function init() {
return runPromise((svc) => svc.init())

View File

@@ -1,6 +1,6 @@
import { DateTime, Effect, Layer, Semaphore, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { Flag } from "@/flag/flag"
import type { SessionID } from "@/session/schema"
import { Filesystem } from "../util/filesystem"
@@ -108,7 +108,7 @@ export namespace FileTime {
}),
).pipe(Layer.orDie)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export function read(sessionID: SessionID, file: string) {
return runPromise((s) => s.read(sessionID, file))

View File

@@ -8,7 +8,7 @@ import z from "zod"
import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { Flag } from "@/flag/flag"
import { Git } from "@/git"
import { Instance } from "@/project/instance"
@@ -159,7 +159,7 @@ export namespace FileWatcher {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export function init() {
return runPromise((svc) => svc.init())

View File

@@ -1,12 +1,10 @@
import { Effect, Layer, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import path from "path"
import { mergeDeep } from "remeda"
import z from "zod"
import { Bus } from "../bus"
import { Config } from "../config/config"
import { File } from "../file"
import { Instance } from "../project/instance"
import { Process } from "../util/process"
import { Log } from "../util/log"
@@ -29,6 +27,7 @@ export namespace Format {
export interface Interface {
readonly init: () => Effect.Effect<void>
readonly status: () => Effect.Effect<Status[]>
readonly file: (filepath: string) => Effect.Effect<void>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Format") {}
@@ -97,53 +96,46 @@ export namespace Format {
return checks.filter((x) => x.enabled).map((x) => x.item)
}
yield* Effect.acquireRelease(
Effect.sync(() =>
Bus.subscribe(
File.Event.Edited,
Instance.bind(async (payload) => {
const file = payload.properties.file
log.info("formatting", { file })
const ext = path.extname(file)
async function formatFile(filepath: string) {
log.info("formatting", { file: filepath })
const ext = path.extname(filepath)
for (const item of await getFormatter(ext)) {
log.info("running", { command: item.command })
try {
const proc = Process.spawn(
item.command.map((x) => x.replace("$FILE", filepath)),
{
cwd: Instance.directory,
env: { ...process.env, ...item.environment },
stdout: "ignore",
stderr: "ignore",
},
)
const exit = await proc.exited
if (exit !== 0) {
log.error("failed", {
command: item.command,
...item.environment,
})
}
} catch (error) {
log.error("failed to format file", {
error,
command: item.command,
...item.environment,
file: filepath,
})
}
}
}
for (const item of await getFormatter(ext)) {
log.info("running", { command: item.command })
try {
const proc = Process.spawn(
item.command.map((x) => x.replace("$FILE", file)),
{
cwd: Instance.directory,
env: { ...process.env, ...item.environment },
stdout: "ignore",
stderr: "ignore",
},
)
const exit = await proc.exited
if (exit !== 0) {
log.error("failed", {
command: item.command,
...item.environment,
})
}
} catch (error) {
log.error("failed to format file", {
error,
command: item.command,
...item.environment,
file,
})
}
}
}),
),
),
(unsubscribe) => Effect.sync(unsubscribe),
)
log.info("init")
return {
formatters,
isEnabled,
formatFile,
}
}),
)
@@ -166,11 +158,16 @@ export namespace Format {
return result
})
return Service.of({ init, status })
const file = Effect.fn("Format.file")(function* (filepath: string) {
const { formatFile } = yield* InstanceState.get(state)
yield* Effect.promise(() => formatFile(filepath))
})
return Service.of({ init, status, file })
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export async function init() {
return runPromise((s) => s.init())
@@ -179,4 +176,8 @@ export namespace Format {
export async function status() {
return runPromise((s) => s.status())
}
export async function file(filepath: string) {
return runPromise((s) => s.file(filepath))
}
}

View File

@@ -2,7 +2,7 @@ import { NodeFileSystem, NodePath } from "@effect/platform-node"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
import { Effect, Layer, ServiceMap, Stream } from "effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
export namespace Git {
const cfg = [
@@ -264,7 +264,7 @@ export namespace Git {
Layer.provide(NodePath.layer),
)
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export function run(args: string[], opts: Options) {
return runPromise((git) => git.run(args, opts))

View File

@@ -2,7 +2,7 @@ import { NodeFileSystem, NodePath } from "@effect/platform-node"
import { Effect, Layer, Schema, ServiceMap, Stream } from "effect"
import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { withTransientReadRetry } from "@/util/effect-http-client"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import path from "path"
@@ -346,7 +346,7 @@ export namespace Installation {
Layer.provide(NodePath.layer),
)
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function info(): Promise<Info> {
return runPromise((svc) => svc.info())

View File

@@ -11,6 +11,9 @@ import { Instance } from "../project/instance"
import { Flag } from "@/flag/flag"
import { Process } from "../util/process"
import { spawn as lspspawn } from "./launch"
import { Effect, Layer, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
export namespace LSP {
const log = Log.create({ service: "lsp" })
@@ -62,92 +65,6 @@ export namespace LSP {
})
export type DocumentSymbol = z.infer<typeof DocumentSymbol>
const filterExperimentalServers = (servers: Record<string, LSPServer.Info>) => {
if (Flag.OPENCODE_EXPERIMENTAL_LSP_TY) {
// If experimental flag is enabled, disable pyright
if (servers["pyright"]) {
log.info("LSP server pyright is disabled because OPENCODE_EXPERIMENTAL_LSP_TY is enabled")
delete servers["pyright"]
}
} else {
// If experimental flag is disabled, disable ty
if (servers["ty"]) {
delete servers["ty"]
}
}
}
const state = Instance.state(
async () => {
const clients: LSPClient.Info[] = []
const servers: Record<string, LSPServer.Info> = {}
const cfg = await Config.get()
if (cfg.lsp === false) {
log.info("all LSPs are disabled")
return {
broken: new Set<string>(),
servers,
clients,
spawning: new Map<string, Promise<LSPClient.Info | undefined>>(),
}
}
for (const server of Object.values(LSPServer)) {
servers[server.id] = server
}
filterExperimentalServers(servers)
for (const [name, item] of Object.entries(cfg.lsp ?? {})) {
const existing = servers[name]
if (item.disabled) {
log.info(`LSP server ${name} is disabled`)
delete servers[name]
continue
}
servers[name] = {
...existing,
id: name,
root: existing?.root ?? (async () => Instance.directory),
extensions: item.extensions ?? existing?.extensions ?? [],
spawn: async (root) => {
return {
process: lspspawn(item.command[0], item.command.slice(1), {
cwd: root,
env: {
...process.env,
...item.env,
},
}),
initialization: item.initialization,
}
},
}
}
log.info("enabled LSP servers", {
serverIds: Object.values(servers)
.map((server) => server.id)
.join(", "),
})
return {
broken: new Set<string>(),
servers,
clients,
spawning: new Map<string, Promise<LSPClient.Info | undefined>>(),
}
},
async (state) => {
await Promise.all(state.clients.map((client) => client.shutdown()))
},
)
export async function init() {
return state()
}
export const Status = z
.object({
id: z.string(),
@@ -160,168 +77,6 @@ export namespace LSP {
})
export type Status = z.infer<typeof Status>
export async function status() {
return state().then((x) => {
const result: Status[] = []
for (const client of x.clients) {
result.push({
id: client.serverID,
name: x.servers[client.serverID].id,
root: path.relative(Instance.directory, client.root),
status: "connected",
})
}
return result
})
}
async function getClients(file: string) {
const s = await state()
// Only spawn LSP clients for files within the instance directory
if (!Instance.containsPath(file)) {
return []
}
const extension = path.parse(file).ext || file
const result: LSPClient.Info[] = []
async function schedule(server: LSPServer.Info, root: string, key: string) {
const handle = await server
.spawn(root)
.then((value) => {
if (!value) s.broken.add(key)
return value
})
.catch((err) => {
s.broken.add(key)
log.error(`Failed to spawn LSP server ${server.id}`, { error: err })
return undefined
})
if (!handle) return undefined
log.info("spawned lsp server", { serverID: server.id })
const client = await LSPClient.create({
serverID: server.id,
server: handle,
root,
}).catch(async (err) => {
s.broken.add(key)
await Process.stop(handle.process)
log.error(`Failed to initialize LSP client ${server.id}`, { error: err })
return undefined
})
if (!client) {
return undefined
}
const existing = s.clients.find((x) => x.root === root && x.serverID === server.id)
if (existing) {
await Process.stop(handle.process)
return existing
}
s.clients.push(client)
return client
}
for (const server of Object.values(s.servers)) {
if (server.extensions.length && !server.extensions.includes(extension)) continue
const root = await server.root(file)
if (!root) continue
if (s.broken.has(root + server.id)) continue
const match = s.clients.find((x) => x.root === root && x.serverID === server.id)
if (match) {
result.push(match)
continue
}
const inflight = s.spawning.get(root + server.id)
if (inflight) {
const client = await inflight
if (!client) continue
result.push(client)
continue
}
const task = schedule(server, root, root + server.id)
s.spawning.set(root + server.id, task)
task.finally(() => {
if (s.spawning.get(root + server.id) === task) {
s.spawning.delete(root + server.id)
}
})
const client = await task
if (!client) continue
result.push(client)
Bus.publish(Event.Updated, {})
}
return result
}
export async function hasClients(file: string) {
const s = await state()
const extension = path.parse(file).ext || file
for (const server of Object.values(s.servers)) {
if (server.extensions.length && !server.extensions.includes(extension)) continue
const root = await server.root(file)
if (!root) continue
if (s.broken.has(root + server.id)) continue
return true
}
return false
}
export async function touchFile(input: string, waitForDiagnostics?: boolean) {
log.info("touching file", { file: input })
const clients = await getClients(input)
await Promise.all(
clients.map(async (client) => {
const wait = waitForDiagnostics ? client.waitForDiagnostics({ path: input }) : Promise.resolve()
await client.notify.open({ path: input })
return wait
}),
).catch((err) => {
log.error("failed to touch file", { err, file: input })
})
}
export async function diagnostics() {
const results: Record<string, LSPClient.Diagnostic[]> = {}
for (const result of await runAll(async (client) => client.diagnostics)) {
for (const [path, diagnostics] of result.entries()) {
const arr = results[path] || []
arr.push(...diagnostics)
results[path] = arr
}
}
return results
}
export async function hover(input: { file: string; line: number; character: number }) {
return run(input.file, (client) => {
return client.connection
.sendRequest("textDocument/hover", {
textDocument: {
uri: pathToFileURL(input.file).href,
},
position: {
line: input.line,
character: input.character,
},
})
.catch(() => null)
})
}
enum SymbolKind {
File = 1,
Module = 2,
@@ -362,115 +117,423 @@ export namespace LSP {
SymbolKind.Enum,
]
export async function workspaceSymbol(query: string) {
return runAll((client) =>
client.connection
.sendRequest("workspace/symbol", {
query,
const filterExperimentalServers = (servers: Record<string, LSPServer.Info>) => {
if (Flag.OPENCODE_EXPERIMENTAL_LSP_TY) {
if (servers["pyright"]) {
log.info("LSP server pyright is disabled because OPENCODE_EXPERIMENTAL_LSP_TY is enabled")
delete servers["pyright"]
}
} else {
if (servers["ty"]) {
delete servers["ty"]
}
}
}
type LocInput = { file: string; line: number; character: number }
interface State {
clients: LSPClient.Info[]
servers: Record<string, LSPServer.Info>
broken: Set<string>
spawning: Map<string, Promise<LSPClient.Info | undefined>>
}
export interface Interface {
readonly init: () => Effect.Effect<void>
readonly status: () => Effect.Effect<Status[]>
readonly hasClients: (file: string) => Effect.Effect<boolean>
readonly touchFile: (input: string, waitForDiagnostics?: boolean) => Effect.Effect<void>
readonly diagnostics: () => Effect.Effect<Record<string, LSPClient.Diagnostic[]>>
readonly hover: (input: LocInput) => Effect.Effect<any>
readonly definition: (input: LocInput) => Effect.Effect<any[]>
readonly references: (input: LocInput) => Effect.Effect<any[]>
readonly implementation: (input: LocInput) => Effect.Effect<any[]>
readonly documentSymbol: (uri: string) => Effect.Effect<(LSP.DocumentSymbol | LSP.Symbol)[]>
readonly workspaceSymbol: (query: string) => Effect.Effect<LSP.Symbol[]>
readonly prepareCallHierarchy: (input: LocInput) => Effect.Effect<any[]>
readonly incomingCalls: (input: LocInput) => Effect.Effect<any[]>
readonly outgoingCalls: (input: LocInput) => Effect.Effect<any[]>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/LSP") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const cache = yield* InstanceState.make<State>(
Effect.fn("LSP.state")(function* () {
const cfg = yield* Effect.promise(() => Config.get())
const servers: Record<string, LSPServer.Info> = {}
if (cfg.lsp === false) {
log.info("all LSPs are disabled")
} else {
for (const server of Object.values(LSPServer)) {
servers[server.id] = server
}
filterExperimentalServers(servers)
for (const [name, item] of Object.entries(cfg.lsp ?? {})) {
const existing = servers[name]
if (item.disabled) {
log.info(`LSP server ${name} is disabled`)
delete servers[name]
continue
}
servers[name] = {
...existing,
id: name,
root: existing?.root ?? (async () => Instance.directory),
extensions: item.extensions ?? existing?.extensions ?? [],
spawn: async (root) => ({
process: lspspawn(item.command[0], item.command.slice(1), {
cwd: root,
env: { ...process.env, ...item.env },
}),
initialization: item.initialization,
}),
}
}
log.info("enabled LSP servers", {
serverIds: Object.values(servers)
.map((server) => server.id)
.join(", "),
})
}
const s: State = {
clients: [],
servers,
broken: new Set(),
spawning: new Map(),
}
yield* Effect.addFinalizer(() =>
Effect.promise(async () => {
await Promise.all(s.clients.map((client) => client.shutdown()))
}),
)
return s
}),
)
const getClients = Effect.fnUntraced(function* (file: string) {
if (!Instance.containsPath(file)) return [] as LSPClient.Info[]
const s = yield* InstanceState.get(cache)
return yield* Effect.promise(async () => {
const extension = path.parse(file).ext || file
const result: LSPClient.Info[] = []
async function schedule(server: LSPServer.Info, root: string, key: string) {
const handle = await server
.spawn(root)
.then((value) => {
if (!value) s.broken.add(key)
return value
})
.catch((err) => {
s.broken.add(key)
log.error(`Failed to spawn LSP server ${server.id}`, { error: err })
return undefined
})
if (!handle) return undefined
log.info("spawned lsp server", { serverID: server.id })
const client = await LSPClient.create({
serverID: server.id,
server: handle,
root,
}).catch(async (err) => {
s.broken.add(key)
await Process.stop(handle.process)
log.error(`Failed to initialize LSP client ${server.id}`, { error: err })
return undefined
})
if (!client) return undefined
const existing = s.clients.find((x) => x.root === root && x.serverID === server.id)
if (existing) {
await Process.stop(handle.process)
return existing
}
s.clients.push(client)
return client
}
for (const server of Object.values(s.servers)) {
if (server.extensions.length && !server.extensions.includes(extension)) continue
const root = await server.root(file)
if (!root) continue
if (s.broken.has(root + server.id)) continue
const match = s.clients.find((x) => x.root === root && x.serverID === server.id)
if (match) {
result.push(match)
continue
}
const inflight = s.spawning.get(root + server.id)
if (inflight) {
const client = await inflight
if (!client) continue
result.push(client)
continue
}
const task = schedule(server, root, root + server.id)
s.spawning.set(root + server.id, task)
task.finally(() => {
if (s.spawning.get(root + server.id) === task) {
s.spawning.delete(root + server.id)
}
})
const client = await task
if (!client) continue
result.push(client)
Bus.publish(Event.Updated, {})
}
return result
})
.then((result: any) => result.filter((x: LSP.Symbol) => kinds.includes(x.kind)))
.then((result: any) => result.slice(0, 10))
.catch(() => []),
).then((result) => result.flat() as LSP.Symbol[])
}
})
export async function documentSymbol(uri: string) {
const file = fileURLToPath(uri)
return run(file, (client) =>
client.connection
.sendRequest("textDocument/documentSymbol", {
textDocument: {
uri,
},
const runForFile = Effect.fnUntraced(function* <T>(file: string, fn: (client: LSPClient.Info) => Promise<T>) {
const clients = yield* getClients(file)
return yield* Effect.promise(() => Promise.all(clients.map((x) => fn(x))))
})
const runForAll = Effect.fnUntraced(function* <T>(fn: (client: LSPClient.Info) => Promise<T>) {
const s = yield* InstanceState.get(cache)
return yield* Effect.promise(() => Promise.all(s.clients.map((x) => fn(x))))
})
const init = Effect.fn("LSP.init")(function* () {
yield* InstanceState.get(cache)
})
const status = Effect.fn("LSP.status")(function* () {
const s = yield* InstanceState.get(cache)
const result: Status[] = []
for (const client of s.clients) {
result.push({
id: client.serverID,
name: s.servers[client.serverID].id,
root: path.relative(Instance.directory, client.root),
status: "connected",
})
}
return result
})
const hasClients = Effect.fn("LSP.hasClients")(function* (file: string) {
const s = yield* InstanceState.get(cache)
return yield* Effect.promise(async () => {
const extension = path.parse(file).ext || file
for (const server of Object.values(s.servers)) {
if (server.extensions.length && !server.extensions.includes(extension)) continue
const root = await server.root(file)
if (!root) continue
if (s.broken.has(root + server.id)) continue
return true
}
return false
})
.catch(() => []),
)
.then((result) => result.flat() as (LSP.DocumentSymbol | LSP.Symbol)[])
.then((result) => result.filter(Boolean))
}
})
export async function definition(input: { file: string; line: number; character: number }) {
return run(input.file, (client) =>
client.connection
.sendRequest("textDocument/definition", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
const touchFile = Effect.fn("LSP.touchFile")(function* (input: string, waitForDiagnostics?: boolean) {
log.info("touching file", { file: input })
const clients = yield* getClients(input)
yield* Effect.promise(() =>
Promise.all(
clients.map(async (client) => {
const wait = waitForDiagnostics ? client.waitForDiagnostics({ path: input }) : Promise.resolve()
await client.notify.open({ path: input })
return wait
}),
).catch((err) => {
log.error("failed to touch file", { err, file: input })
}),
)
})
const diagnostics = Effect.fn("LSP.diagnostics")(function* () {
const results: Record<string, LSPClient.Diagnostic[]> = {}
const all = yield* runForAll(async (client) => client.diagnostics)
for (const result of all) {
for (const [p, diags] of result.entries()) {
const arr = results[p] || []
arr.push(...diags)
results[p] = arr
}
}
return results
})
const hover = Effect.fn("LSP.hover")(function* (input: LocInput) {
return yield* runForFile(input.file, (client) =>
client.connection
.sendRequest("textDocument/hover", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => null),
)
})
const definition = Effect.fn("LSP.definition")(function* (input: LocInput) {
const results = yield* runForFile(input.file, (client) =>
client.connection
.sendRequest("textDocument/definition", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => null),
)
return results.flat().filter(Boolean)
})
const references = Effect.fn("LSP.references")(function* (input: LocInput) {
const results = yield* runForFile(input.file, (client) =>
client.connection
.sendRequest("textDocument/references", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
context: { includeDeclaration: true },
})
.catch(() => []),
)
return results.flat().filter(Boolean)
})
const implementation = Effect.fn("LSP.implementation")(function* (input: LocInput) {
const results = yield* runForFile(input.file, (client) =>
client.connection
.sendRequest("textDocument/implementation", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => null),
)
return results.flat().filter(Boolean)
})
const documentSymbol = Effect.fn("LSP.documentSymbol")(function* (uri: string) {
const file = fileURLToPath(uri)
const results = yield* runForFile(file, (client) =>
client.connection.sendRequest("textDocument/documentSymbol", { textDocument: { uri } }).catch(() => []),
)
return (results.flat() as (LSP.DocumentSymbol | LSP.Symbol)[]).filter(Boolean)
})
const workspaceSymbol = Effect.fn("LSP.workspaceSymbol")(function* (query: string) {
const results = yield* runForAll((client) =>
client.connection
.sendRequest("workspace/symbol", { query })
.then((result: any) => result.filter((x: LSP.Symbol) => kinds.includes(x.kind)))
.then((result: any) => result.slice(0, 10))
.catch(() => []),
)
return results.flat() as LSP.Symbol[]
})
const prepareCallHierarchy = Effect.fn("LSP.prepareCallHierarchy")(function* (input: LocInput) {
const results = yield* runForFile(input.file, (client) =>
client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => []),
)
return results.flat().filter(Boolean)
})
const callHierarchyRequest = Effect.fnUntraced(function* (
input: LocInput,
direction: "callHierarchy/incomingCalls" | "callHierarchy/outgoingCalls",
) {
const results = yield* runForFile(input.file, async (client) => {
const items = (await client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => [])) as any[]
if (!items?.length) return []
return client.connection.sendRequest(direction, { item: items[0] }).catch(() => [])
})
.catch(() => null),
).then((result) => result.flat().filter(Boolean))
}
return results.flat().filter(Boolean)
})
export async function references(input: { file: string; line: number; character: number }) {
return run(input.file, (client) =>
client.connection
.sendRequest("textDocument/references", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
context: { includeDeclaration: true },
})
.catch(() => []),
).then((result) => result.flat().filter(Boolean))
}
const incomingCalls = Effect.fn("LSP.incomingCalls")(function* (input: LocInput) {
return yield* callHierarchyRequest(input, "callHierarchy/incomingCalls")
})
export async function implementation(input: { file: string; line: number; character: number }) {
return run(input.file, (client) =>
client.connection
.sendRequest("textDocument/implementation", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => null),
).then((result) => result.flat().filter(Boolean))
}
const outgoingCalls = Effect.fn("LSP.outgoingCalls")(function* (input: LocInput) {
return yield* callHierarchyRequest(input, "callHierarchy/outgoingCalls")
})
export async function prepareCallHierarchy(input: { file: string; line: number; character: number }) {
return run(input.file, (client) =>
client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => []),
).then((result) => result.flat().filter(Boolean))
}
return Service.of({
init,
status,
hasClients,
touchFile,
diagnostics,
hover,
definition,
references,
implementation,
documentSymbol,
workspaceSymbol,
prepareCallHierarchy,
incomingCalls,
outgoingCalls,
})
}),
)
export async function incomingCalls(input: { file: string; line: number; character: number }) {
return run(input.file, async (client) => {
const items = (await client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => [])) as any[]
if (!items?.length) return []
return client.connection.sendRequest("callHierarchy/incomingCalls", { item: items[0] }).catch(() => [])
}).then((result) => result.flat().filter(Boolean))
}
const { runPromise } = makeRuntime(Service, layer)
export async function outgoingCalls(input: { file: string; line: number; character: number }) {
return run(input.file, async (client) => {
const items = (await client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => [])) as any[]
if (!items?.length) return []
return client.connection.sendRequest("callHierarchy/outgoingCalls", { item: items[0] }).catch(() => [])
}).then((result) => result.flat().filter(Boolean))
}
export const init = async () => runPromise((svc) => svc.init())
async function runAll<T>(input: (client: LSPClient.Info) => Promise<T>): Promise<T[]> {
const clients = await state().then((x) => x.clients)
const tasks = clients.map((x) => input(x))
return Promise.all(tasks)
}
export const status = async () => runPromise((svc) => svc.status())
async function run<T>(file: string, input: (client: LSPClient.Info) => Promise<T>): Promise<T[]> {
const clients = await getClients(file)
const tasks = clients.map((x) => input(x))
return Promise.all(tasks)
}
export const hasClients = async (file: string) => runPromise((svc) => svc.hasClients(file))
export const touchFile = async (input: string, waitForDiagnostics?: boolean) =>
runPromise((svc) => svc.touchFile(input, waitForDiagnostics))
export const diagnostics = async () => runPromise((svc) => svc.diagnostics())
export const hover = async (input: LocInput) => runPromise((svc) => svc.hover(input))
export const definition = async (input: LocInput) => runPromise((svc) => svc.definition(input))
export const references = async (input: LocInput) => runPromise((svc) => svc.references(input))
export const implementation = async (input: LocInput) => runPromise((svc) => svc.implementation(input))
export const documentSymbol = async (uri: string) => runPromise((svc) => svc.documentSymbol(uri))
export const workspaceSymbol = async (query: string) => runPromise((svc) => svc.workspaceSymbol(query))
export const prepareCallHierarchy = async (input: LocInput) => runPromise((svc) => svc.prepareCallHierarchy(input))
export const incomingCalls = async (input: LocInput) => runPromise((svc) => svc.incomingCalls(input))
export const outgoingCalls = async (input: LocInput) => runPromise((svc) => svc.outgoingCalls(input))
export namespace Diagnostic {
export function pretty(diagnostic: LSPClient.Diagnostic) {

View File

@@ -1,7 +1,9 @@
import path from "path"
import z from "zod"
import { Global } from "../global"
import { Filesystem } from "../util/filesystem"
import { Effect, Layer, ServiceMap } from "effect"
import { AppFileSystem } from "@/filesystem"
import { makeRuntime } from "@/effect/run-service"
export namespace McpAuth {
export const Tokens = z.object({
@@ -25,106 +27,155 @@ export namespace McpAuth {
clientInfo: ClientInfo.optional(),
codeVerifier: z.string().optional(),
oauthState: z.string().optional(),
serverUrl: z.string().optional(), // Track the URL these credentials are for
serverUrl: z.string().optional(),
})
export type Entry = z.infer<typeof Entry>
const filepath = path.join(Global.Path.data, "mcp-auth.json")
export async function get(mcpName: string): Promise<Entry | undefined> {
const data = await all()
return data[mcpName]
export interface Interface {
readonly all: () => Effect.Effect<Record<string, Entry>>
readonly get: (mcpName: string) => Effect.Effect<Entry | undefined>
readonly getForUrl: (mcpName: string, serverUrl: string) => Effect.Effect<Entry | undefined>
readonly set: (mcpName: string, entry: Entry, serverUrl?: string) => Effect.Effect<void>
readonly remove: (mcpName: string) => Effect.Effect<void>
readonly updateTokens: (mcpName: string, tokens: Tokens, serverUrl?: string) => Effect.Effect<void>
readonly updateClientInfo: (mcpName: string, clientInfo: ClientInfo, serverUrl?: string) => Effect.Effect<void>
readonly updateCodeVerifier: (mcpName: string, codeVerifier: string) => Effect.Effect<void>
readonly clearCodeVerifier: (mcpName: string) => Effect.Effect<void>
readonly updateOAuthState: (mcpName: string, oauthState: string) => Effect.Effect<void>
readonly getOAuthState: (mcpName: string) => Effect.Effect<string | undefined>
readonly clearOAuthState: (mcpName: string) => Effect.Effect<void>
readonly isTokenExpired: (mcpName: string) => Effect.Effect<boolean | null>
}
/**
* Get auth entry and validate it's for the correct URL.
* Returns undefined if URL has changed (credentials are invalid).
*/
export async function getForUrl(mcpName: string, serverUrl: string): Promise<Entry | undefined> {
const entry = await get(mcpName)
if (!entry) return undefined
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/McpAuth") {}
// If no serverUrl is stored, this is from an old version - consider it invalid
if (!entry.serverUrl) return undefined
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
// If URL has changed, credentials are invalid
if (entry.serverUrl !== serverUrl) return undefined
const all = Effect.fn("McpAuth.all")(function* () {
return yield* fs.readJson(filepath).pipe(
Effect.map((data) => data as Record<string, Entry>),
Effect.catch(() => Effect.succeed({} as Record<string, Entry>)),
)
})
return entry
}
const get = Effect.fn("McpAuth.get")(function* (mcpName: string) {
const data = yield* all()
return data[mcpName]
})
export async function all(): Promise<Record<string, Entry>> {
return Filesystem.readJson<Record<string, Entry>>(filepath).catch(() => ({}))
}
const getForUrl = Effect.fn("McpAuth.getForUrl")(function* (mcpName: string, serverUrl: string) {
const entry = yield* get(mcpName)
if (!entry) return undefined
if (!entry.serverUrl) return undefined
if (entry.serverUrl !== serverUrl) return undefined
return entry
})
export async function set(mcpName: string, entry: Entry, serverUrl?: string): Promise<void> {
const data = await all()
// Always update serverUrl if provided
if (serverUrl) {
entry.serverUrl = serverUrl
}
await Filesystem.writeJson(filepath, { ...data, [mcpName]: entry }, 0o600)
}
const set = Effect.fn("McpAuth.set")(function* (mcpName: string, entry: Entry, serverUrl?: string) {
const data = yield* all()
if (serverUrl) entry.serverUrl = serverUrl
yield* fs.writeJson(filepath, { ...data, [mcpName]: entry }, 0o600).pipe(Effect.orDie)
})
export async function remove(mcpName: string): Promise<void> {
const data = await all()
delete data[mcpName]
await Filesystem.writeJson(filepath, data, 0o600)
}
const remove = Effect.fn("McpAuth.remove")(function* (mcpName: string) {
const data = yield* all()
delete data[mcpName]
yield* fs.writeJson(filepath, data, 0o600).pipe(Effect.orDie)
})
export async function updateTokens(mcpName: string, tokens: Tokens, serverUrl?: string): Promise<void> {
const entry = (await get(mcpName)) ?? {}
entry.tokens = tokens
await set(mcpName, entry, serverUrl)
}
const updateField = <K extends keyof Entry>(field: K, spanName: string) =>
Effect.fn(`McpAuth.${spanName}`)(function* (mcpName: string, value: NonNullable<Entry[K]>, serverUrl?: string) {
const entry = (yield* get(mcpName)) ?? {}
entry[field] = value
yield* set(mcpName, entry, serverUrl)
})
export async function updateClientInfo(mcpName: string, clientInfo: ClientInfo, serverUrl?: string): Promise<void> {
const entry = (await get(mcpName)) ?? {}
entry.clientInfo = clientInfo
await set(mcpName, entry, serverUrl)
}
const clearField = <K extends keyof Entry>(field: K, spanName: string) =>
Effect.fn(`McpAuth.${spanName}`)(function* (mcpName: string) {
const entry = yield* get(mcpName)
if (entry) {
delete entry[field]
yield* set(mcpName, entry)
}
})
export async function updateCodeVerifier(mcpName: string, codeVerifier: string): Promise<void> {
const entry = (await get(mcpName)) ?? {}
entry.codeVerifier = codeVerifier
await set(mcpName, entry)
}
const updateTokens = updateField("tokens", "updateTokens")
const updateClientInfo = updateField("clientInfo", "updateClientInfo")
const updateCodeVerifier = updateField("codeVerifier", "updateCodeVerifier")
const updateOAuthState = updateField("oauthState", "updateOAuthState")
const clearCodeVerifier = clearField("codeVerifier", "clearCodeVerifier")
const clearOAuthState = clearField("oauthState", "clearOAuthState")
export async function clearCodeVerifier(mcpName: string): Promise<void> {
const entry = await get(mcpName)
if (entry) {
delete entry.codeVerifier
await set(mcpName, entry)
}
}
const getOAuthState = Effect.fn("McpAuth.getOAuthState")(function* (mcpName: string) {
const entry = yield* get(mcpName)
return entry?.oauthState
})
export async function updateOAuthState(mcpName: string, oauthState: string): Promise<void> {
const entry = (await get(mcpName)) ?? {}
entry.oauthState = oauthState
await set(mcpName, entry)
}
const isTokenExpired = Effect.fn("McpAuth.isTokenExpired")(function* (mcpName: string) {
const entry = yield* get(mcpName)
if (!entry?.tokens) return null
if (!entry.tokens.expiresAt) return false
return entry.tokens.expiresAt < Date.now() / 1000
})
export async function getOAuthState(mcpName: string): Promise<string | undefined> {
const entry = await get(mcpName)
return entry?.oauthState
}
return Service.of({
all,
get,
getForUrl,
set,
remove,
updateTokens,
updateClientInfo,
updateCodeVerifier,
clearCodeVerifier,
updateOAuthState,
getOAuthState,
clearOAuthState,
isTokenExpired,
})
}),
)
export async function clearOAuthState(mcpName: string): Promise<void> {
const entry = await get(mcpName)
if (entry) {
delete entry.oauthState
await set(mcpName, entry)
}
}
const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer))
/**
* Check if stored tokens are expired.
* Returns null if no tokens exist, false if no expiry or not expired, true if expired.
*/
export async function isTokenExpired(mcpName: string): Promise<boolean | null> {
const entry = await get(mcpName)
if (!entry?.tokens) return null
if (!entry.tokens.expiresAt) return false
return entry.tokens.expiresAt < Date.now() / 1000
}
const { runPromise } = makeRuntime(Service, defaultLayer)
// Async facades for backward compat (used by McpOAuthProvider, CLI)
export const get = async (mcpName: string) => runPromise((svc) => svc.get(mcpName))
export const getForUrl = async (mcpName: string, serverUrl: string) =>
runPromise((svc) => svc.getForUrl(mcpName, serverUrl))
export const all = async () => runPromise((svc) => svc.all())
export const set = async (mcpName: string, entry: Entry, serverUrl?: string) =>
runPromise((svc) => svc.set(mcpName, entry, serverUrl))
export const remove = async (mcpName: string) => runPromise((svc) => svc.remove(mcpName))
export const updateTokens = async (mcpName: string, tokens: Tokens, serverUrl?: string) =>
runPromise((svc) => svc.updateTokens(mcpName, tokens, serverUrl))
export const updateClientInfo = async (mcpName: string, clientInfo: ClientInfo, serverUrl?: string) =>
runPromise((svc) => svc.updateClientInfo(mcpName, clientInfo, serverUrl))
export const updateCodeVerifier = async (mcpName: string, codeVerifier: string) =>
runPromise((svc) => svc.updateCodeVerifier(mcpName, codeVerifier))
export const clearCodeVerifier = async (mcpName: string) => runPromise((svc) => svc.clearCodeVerifier(mcpName))
export const updateOAuthState = async (mcpName: string, oauthState: string) =>
runPromise((svc) => svc.updateOAuthState(mcpName, oauthState))
export const getOAuthState = async (mcpName: string) => runPromise((svc) => svc.getOAuthState(mcpName))
export const clearOAuthState = async (mcpName: string) => runPromise((svc) => svc.clearOAuthState(mcpName))
export const isTokenExpired = async (mcpName: string) => runPromise((svc) => svc.isTokenExpired(mcpName))
}

File diff suppressed because it is too large Load Diff

View File

@@ -54,6 +54,9 @@ interface PendingAuth {
export namespace McpOAuthCallback {
let server: ReturnType<typeof Bun.serve> | undefined
const pendingAuths = new Map<string, PendingAuth>()
// Reverse index: mcpName → oauthState, so cancelPending(mcpName) can
// find the right entry in pendingAuths (which is keyed by oauthState).
const mcpNameToState = new Map<string, string>()
const CALLBACK_TIMEOUT_MS = 5 * 60 * 1000 // 5 minutes
@@ -98,6 +101,12 @@ export namespace McpOAuthCallback {
const pending = pendingAuths.get(state)!
clearTimeout(pending.timeout)
pendingAuths.delete(state)
for (const [name, s] of mcpNameToState) {
if (s === state) {
mcpNameToState.delete(name)
break
}
}
pending.reject(new Error(errorMsg))
}
return new Response(HTML_ERROR(errorMsg), {
@@ -126,6 +135,13 @@ export namespace McpOAuthCallback {
clearTimeout(pending.timeout)
pendingAuths.delete(state)
// Clean up reverse index
for (const [name, s] of mcpNameToState) {
if (s === state) {
mcpNameToState.delete(name)
break
}
}
pending.resolve(code)
return new Response(HTML_SUCCESS, {
@@ -137,11 +153,13 @@ export namespace McpOAuthCallback {
log.info("oauth callback server started", { port: OAUTH_CALLBACK_PORT })
}
export function waitForCallback(oauthState: string): Promise<string> {
export function waitForCallback(oauthState: string, mcpName?: string): Promise<string> {
if (mcpName) mcpNameToState.set(mcpName, oauthState)
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
if (pendingAuths.has(oauthState)) {
pendingAuths.delete(oauthState)
if (mcpName) mcpNameToState.delete(mcpName)
reject(new Error("OAuth callback timeout - authorization took too long"))
}
}, CALLBACK_TIMEOUT_MS)
@@ -151,10 +169,14 @@ export namespace McpOAuthCallback {
}
export function cancelPending(mcpName: string): void {
const pending = pendingAuths.get(mcpName)
// Look up the oauthState for this mcpName via the reverse index
const oauthState = mcpNameToState.get(mcpName)
const key = oauthState ?? mcpName
const pending = pendingAuths.get(key)
if (pending) {
clearTimeout(pending.timeout)
pendingAuths.delete(mcpName)
pendingAuths.delete(key)
mcpNameToState.delete(mcpName)
pending.reject(new Error("Authorization cancelled"))
}
}
@@ -184,6 +206,7 @@ export namespace McpOAuthCallback {
pending.reject(new Error("OAuth callback server stopped"))
}
pendingAuths.clear()
mcpNameToState.clear()
}
export function isRunning(): boolean {

View File

@@ -2,7 +2,7 @@ import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { Config } from "@/config/config"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { ProjectID } from "@/project/schema"
import { Instance } from "@/project/instance"
import { MessageID, SessionID } from "@/session/schema"
@@ -306,7 +306,7 @@ export namespace Permission {
return result
}
export const runPromise = makeRunPromise(Service, layer)
export const { runPromise } = makeRuntime(Service, layer)
export async function ask(input: z.infer<typeof AskInput>) {
return runPromise((s) => s.ask(input))

View File

@@ -11,9 +11,9 @@ import { NamedError } from "@opencode-ai/util/error"
import { CopilotAuthPlugin } from "./copilot"
import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
import { PoeAuthPlugin } from "opencode-poe-auth"
import { Effect, Layer, ServiceMap } from "effect"
import { Effect, Layer, ServiceMap, Stream } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
export namespace Plugin {
const log = Log.create({ service: "plugin" })
@@ -52,6 +52,8 @@ export namespace Plugin {
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const bus = yield* Bus.Service
const cache = yield* InstanceState.make<State>(
Effect.fn("Plugin.state")(function* (ctx) {
const hooks: Hooks[] = []
@@ -146,16 +148,16 @@ export namespace Plugin {
}
})
// Subscribe to bus events, clean up when scope is closed
yield* Effect.acquireRelease(
Effect.sync(() =>
Bus.subscribeAll(async (input) => {
// Subscribe to bus events, fiber interrupted when scope closes
yield* bus.subscribeAll().pipe(
Stream.runForEach((input) =>
Effect.sync(() => {
for (const hook of hooks) {
hook["event"]?.({ event: input })
hook["event"]?.({ event: input as any })
}
}),
),
(unsub) => Effect.sync(unsub),
Effect.forkScoped,
)
return { hooks }
@@ -192,7 +194,8 @@ export namespace Plugin {
}),
)
const runPromise = makeRunPromise(Service, layer)
const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function trigger<
Name extends TriggerName,

View File

@@ -7,14 +7,13 @@ import { Context } from "../util/context"
import { Project } from "./project"
import { State } from "./state"
export interface InstanceContext {
export interface Shape {
directory: string
worktree: string
project: Project.Info
}
const context = Context.create<InstanceContext>("instance")
const cache = new Map<string, Promise<InstanceContext>>()
const context = Context.create<Shape>("instance")
const cache = new Map<string, Promise<Shape>>()
const disposal = {
all: undefined as Promise<void> | undefined,
@@ -53,7 +52,7 @@ function boot(input: { directory: string; init?: () => Promise<any>; project?: P
})
}
function track(directory: string, next: Promise<InstanceContext>) {
function track(directory: string, next: Promise<Shape>) {
const task = next.catch((error) => {
if (cache.get(directory) === task) cache.delete(directory)
throw error

View File

@@ -11,7 +11,7 @@ import { ProjectID } from "./schema"
import { Effect, Layer, Path, Scope, ServiceMap, Stream } from "effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import { NodeFileSystem, NodePath } from "@effect/platform-node"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { AppFileSystem } from "@/filesystem"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
@@ -462,7 +462,7 @@ export namespace Project {
Layer.provide(NodeFileSystem.layer),
Layer.provide(NodePath.layer),
)
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
// ---------------------------------------------------------------------------
// Promise-based API (delegates to Effect service via runPromise)

View File

@@ -1,9 +1,9 @@
import { Effect, Layer, ServiceMap } from "effect"
import { Effect, Layer, ServiceMap, Stream } from "effect"
import path from "path"
import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { AppFileSystem } from "@/filesystem"
import { FileWatcher } from "@/file/watcher"
import { Git } from "@/git"
@@ -139,11 +139,12 @@ export namespace Vcs {
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Vcs") {}
export const layer: Layer.Layer<Service, never, AppFileSystem.Service | Git.Service> = Layer.effect(
export const layer: Layer.Layer<Service, never, AppFileSystem.Service | Git.Service | Bus.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const git = yield* Git.Service
const bus = yield* Bus.Service
const state = yield* InstanceState.make<State>(
Effect.fn("Vcs.state")((ctx) =>
Effect.gen(function* () {
@@ -158,21 +159,19 @@ export namespace Vcs {
const value = { current, root }
log.info("initialized", { branch: value.current, default_branch: value.root?.name })
yield* Effect.acquireRelease(
Effect.sync(() =>
Bus.subscribe(
FileWatcher.Event.Updated,
Instance.bind(async (evt) => {
if (!evt.properties.file.endsWith("HEAD")) return
const next = await get()
if (next === value.current) return
yield* bus.subscribe(FileWatcher.Event.Updated).pipe(
Stream.filter((evt) => evt.properties.file.endsWith("HEAD")),
Stream.runForEach((_evt) =>
Effect.gen(function* () {
const next = yield* Effect.promise(() => get())
if (next !== value.current) {
log.info("branch changed", { from: value.current, to: next })
value.current = next
Bus.publish(Event.BranchUpdated, { branch: next })
}),
),
yield* bus.publish(Event.BranchUpdated, { branch: next })
}
}),
),
(unsubscribe) => Effect.sync(unsubscribe),
Effect.forkScoped,
)
return value
@@ -212,9 +211,13 @@ export namespace Vcs {
}),
)
export const defaultLayer = layer.pipe(Layer.provide(Git.defaultLayer), Layer.provide(AppFileSystem.defaultLayer))
export const defaultLayer = layer.pipe(
Layer.provide(Git.defaultLayer),
Layer.provide(AppFileSystem.defaultLayer),
Layer.provide(Bus.layer),
)
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export function init() {
return runPromise((svc) => svc.init())

View File

@@ -2,7 +2,7 @@ import type { AuthOuathResult, Hooks } from "@opencode-ai/plugin"
import { NamedError } from "@opencode-ai/util/error"
import { Auth } from "@/auth"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { Plugin } from "../plugin"
import { ProviderID } from "./schema"
import { Array as Arr, Effect, Layer, Record, Result, ServiceMap } from "effect"
@@ -231,7 +231,7 @@ export namespace ProviderAuth {
export const defaultLayer = layer.pipe(Layer.provide(Auth.layer))
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function methods() {
return runPromise((svc) => svc.methods())

View File

@@ -1,7 +1,7 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { Instance } from "@/project/instance"
import { type IPty } from "bun-pty"
import z from "zod"
@@ -361,7 +361,7 @@ export namespace Pty {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export async function list() {
return runPromise((svc) => svc.list())

View File

@@ -2,7 +2,7 @@ import { Deferred, Effect, Layer, Schema, ServiceMap } from "effect"
import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { SessionID, MessageID } from "@/session/schema"
import { Log } from "@/util/log"
import z from "zod"
@@ -197,7 +197,7 @@ export namespace Question {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export async function ask(input: {
sessionID: SessionID

View File

@@ -63,7 +63,12 @@ export function toPartialRow(info: DeepPartial<Session.Info>) {
export default [
SyncEvent.project(Session.Event.Created, (db, data) => {
db.insert(SessionTable).values(Session.toRow(data.info)).run()
try {
db.insert(SessionTable).values(Session.toRow(data.info)).run()
} catch (err) {
if (!foreign(err)) throw err
log.warn("ignored session create — project missing", { sessionID: data.info.id, projectID: data.info.projectID })
}
}),
SyncEvent.project(Session.Event.Updated, (db, data) => {

View File

@@ -1,7 +1,7 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { SessionID } from "./schema"
import { Effect, Layer, ServiceMap } from "effect"
import z from "zod"
@@ -55,6 +55,8 @@ export namespace SessionStatus {
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const bus = yield* Bus.Service
const state = yield* InstanceState.make(
Effect.fn("SessionStatus.state")(() => Effect.succeed(new Map<SessionID, Info>())),
)
@@ -70,9 +72,9 @@ export namespace SessionStatus {
const set = Effect.fn("SessionStatus.set")(function* (sessionID: SessionID, status: Info) {
const data = yield* InstanceState.get(state)
yield* Effect.promise(() => Bus.publish(Event.Status, { sessionID, status }))
yield* bus.publish(Event.Status, { sessionID, status })
if (status.type === "idle") {
yield* Effect.promise(() => Bus.publish(Event.Idle, { sessionID }))
yield* bus.publish(Event.Idle, { sessionID })
data.delete(sessionID)
return
}
@@ -83,7 +85,8 @@ export namespace SessionStatus {
}),
)
const runPromise = makeRunPromise(Service, layer)
const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function get(sessionID: SessionID) {
return runPromise((svc) => svc.get(sessionID))

View File

@@ -7,7 +7,7 @@ import { NamedError } from "@opencode-ai/util/error"
import type { Agent } from "@/agent/agent"
import { Bus } from "@/bus"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { Flag } from "@/flag/flag"
import { Global } from "@/global"
import { Permission } from "@/permission"
@@ -54,11 +54,6 @@ export namespace Skill {
type State = {
skills: Record<string, Info>
dirs: Set<string>
task?: Promise<void>
}
type Cache = State & {
ensure: () => Promise<void>
}
export interface Interface {
@@ -116,66 +111,47 @@ export namespace Skill {
})
}
// TODO: Migrate to Effect
const create = (discovery: Discovery.Interface, directory: string, worktree: string): Cache => {
const state: State = {
skills: {},
dirs: new Set<string>(),
async function loadSkills(state: State, discovery: Discovery.Interface, directory: string, worktree: string) {
if (!Flag.OPENCODE_DISABLE_EXTERNAL_SKILLS) {
for (const dir of EXTERNAL_DIRS) {
const root = path.join(Global.Path.home, dir)
if (!(await Filesystem.isDir(root))) continue
await scan(state, root, EXTERNAL_SKILL_PATTERN, { dot: true, scope: "global" })
}
for await (const root of Filesystem.up({
targets: EXTERNAL_DIRS,
start: directory,
stop: worktree,
})) {
await scan(state, root, EXTERNAL_SKILL_PATTERN, { dot: true, scope: "project" })
}
}
const load = async () => {
if (!Flag.OPENCODE_DISABLE_EXTERNAL_SKILLS) {
for (const dir of EXTERNAL_DIRS) {
const root = path.join(Global.Path.home, dir)
if (!(await Filesystem.isDir(root))) continue
await scan(state, root, EXTERNAL_SKILL_PATTERN, { dot: true, scope: "global" })
}
for (const dir of await Config.directories()) {
await scan(state, dir, OPENCODE_SKILL_PATTERN)
}
for await (const root of Filesystem.up({
targets: EXTERNAL_DIRS,
start: directory,
stop: worktree,
})) {
await scan(state, root, EXTERNAL_SKILL_PATTERN, { dot: true, scope: "project" })
}
const cfg = await Config.get()
for (const item of cfg.skills?.paths ?? []) {
const expanded = item.startsWith("~/") ? path.join(os.homedir(), item.slice(2)) : item
const dir = path.isAbsolute(expanded) ? expanded : path.join(directory, expanded)
if (!(await Filesystem.isDir(dir))) {
log.warn("skill path not found", { path: dir })
continue
}
for (const dir of await Config.directories()) {
await scan(state, dir, OPENCODE_SKILL_PATTERN)
}
const cfg = await Config.get()
for (const item of cfg.skills?.paths ?? []) {
const expanded = item.startsWith("~/") ? path.join(os.homedir(), item.slice(2)) : item
const dir = path.isAbsolute(expanded) ? expanded : path.join(directory, expanded)
if (!(await Filesystem.isDir(dir))) {
log.warn("skill path not found", { path: dir })
continue
}
await scan(state, dir, SKILL_PATTERN)
}
for (const url of cfg.skills?.urls ?? []) {
for (const dir of await Effect.runPromise(discovery.pull(url))) {
state.dirs.add(dir)
await scan(state, dir, SKILL_PATTERN)
}
for (const url of cfg.skills?.urls ?? []) {
for (const dir of await Effect.runPromise(discovery.pull(url))) {
state.dirs.add(dir)
await scan(state, dir, SKILL_PATTERN)
}
}
log.info("init", { count: Object.keys(state.skills).length })
}
const ensure = () => {
if (state.task) return state.task
state.task = load().catch((err) => {
state.task = undefined
throw err
})
return state.task
}
return { ...state, ensure }
log.info("init", { count: Object.keys(state.skills).length })
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Skill") {}
@@ -185,33 +161,33 @@ export namespace Skill {
Effect.gen(function* () {
const discovery = yield* Discovery.Service
const state = yield* InstanceState.make(
Effect.fn("Skill.state")((ctx) => Effect.sync(() => create(discovery, ctx.directory, ctx.worktree))),
Effect.fn("Skill.state")((ctx) =>
Effect.gen(function* () {
const s: State = { skills: {}, dirs: new Set() }
yield* Effect.promise(() => loadSkills(s, discovery, ctx.directory, ctx.worktree))
return s
}),
),
)
const ensure = Effect.fn("Skill.ensure")(function* () {
const cache = yield* InstanceState.get(state)
yield* Effect.promise(() => cache.ensure())
return cache
})
const get = Effect.fn("Skill.get")(function* (name: string) {
const cache = yield* ensure()
return cache.skills[name]
const s = yield* InstanceState.get(state)
return s.skills[name]
})
const all = Effect.fn("Skill.all")(function* () {
const cache = yield* ensure()
return Object.values(cache.skills)
const s = yield* InstanceState.get(state)
return Object.values(s.skills)
})
const dirs = Effect.fn("Skill.dirs")(function* () {
const cache = yield* ensure()
return Array.from(cache.dirs)
const s = yield* InstanceState.get(state)
return Array.from(s.dirs)
})
const available = Effect.fn("Skill.available")(function* (agent?: Agent.Info) {
const cache = yield* ensure()
const list = Object.values(cache.skills).toSorted((a, b) => a.name.localeCompare(b.name))
const s = yield* InstanceState.get(state)
const list = Object.values(s.skills).toSorted((a, b) => a.name.localeCompare(b.name))
if (!agent) return list
return list.filter((skill) => Permission.evaluate("skill", skill.name, agent.permission).action !== "deny")
})
@@ -242,7 +218,7 @@ export namespace Skill {
return ["## Available Skills", ...list.map((skill) => `- **${skill.name}**: ${skill.description}`)].join("\n")
}
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function get(name: string) {
return runPromise((skill) => skill.get(name))

View File

@@ -5,7 +5,7 @@ import path from "path"
import z from "zod"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { AppFileSystem } from "@/filesystem"
import { Hash } from "@/util/hash"
import { Config } from "../config/config"
@@ -459,7 +459,7 @@ export namespace Snapshot {
Layer.provide(NodePath.layer),
)
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function init() {
return runPromise((svc) => svc.init())

View File

@@ -13,6 +13,7 @@ import { LSP } from "../lsp"
import { Filesystem } from "../util/filesystem"
import DESCRIPTION from "./apply_patch.txt"
import { File } from "../file"
import { Format } from "../format"
const PatchParams = z.object({
patchText: z.string().describe("The full patch text that describes all changes to be made"),
@@ -220,9 +221,8 @@ export const ApplyPatchTool = Tool.define("apply_patch", {
}
if (edited) {
await Bus.publish(File.Event.Edited, {
file: edited,
})
await Format.file(edited)
Bus.publish(File.Event.Edited, { file: edited })
}
}

View File

@@ -12,6 +12,7 @@ import DESCRIPTION from "./edit.txt"
import { File } from "../file"
import { FileWatcher } from "../file/watcher"
import { Bus } from "../bus"
import { Format } from "../format"
import { FileTime } from "../file/time"
import { Filesystem } from "../util/filesystem"
import { Instance } from "../project/instance"
@@ -71,9 +72,8 @@ export const EditTool = Tool.define("edit", {
},
})
await Filesystem.write(filePath, params.newString)
await Bus.publish(File.Event.Edited, {
file: filePath,
})
await Format.file(filePath)
Bus.publish(File.Event.Edited, { file: filePath })
await Bus.publish(FileWatcher.Event.Updated, {
file: filePath,
event: existed ? "change" : "add",
@@ -108,9 +108,8 @@ export const EditTool = Tool.define("edit", {
})
await Filesystem.write(filePath, contentNew)
await Bus.publish(File.Event.Edited, {
file: filePath,
})
await Format.file(filePath)
Bus.publish(File.Event.Edited, { file: filePath })
await Bus.publish(FileWatcher.Event.Updated, {
file: filePath,
event: "change",

View File

@@ -31,7 +31,7 @@ import { Glob } from "../util/glob"
import { pathToFileURL } from "url"
import { Effect, Layer, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
export namespace ToolRegistry {
const log = Log.create({ service: "tool.registry" })
@@ -198,7 +198,7 @@ export namespace ToolRegistry {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export async function register(tool: Tool.Info) {
return runPromise((svc) => svc.register(tool))

View File

@@ -2,7 +2,7 @@ import { NodePath } from "@effect/platform-node"
import { Cause, Duration, Effect, Layer, Schedule, ServiceMap } from "effect"
import path from "path"
import type { Agent } from "../agent/agent"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { AppFileSystem } from "@/filesystem"
import { evaluate } from "@/permission/evaluate"
import { Identifier } from "../id/id"
@@ -136,7 +136,7 @@ export namespace Truncate {
export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer), Layer.provide(NodePath.layer))
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function output(text: string, options: Options = {}, agent?: Agent.Info): Promise<Result> {
return runPromise((s) => s.output(text, options, agent))

View File

@@ -7,6 +7,7 @@ import DESCRIPTION from "./write.txt"
import { Bus } from "../bus"
import { File } from "../file"
import { FileWatcher } from "../file/watcher"
import { Format } from "../format"
import { FileTime } from "../file/time"
import { Filesystem } from "../util/filesystem"
import { Instance } from "../project/instance"
@@ -42,9 +43,8 @@ export const WriteTool = Tool.define("write", {
})
await Filesystem.write(filepath, params.content)
await Bus.publish(File.Event.Edited, {
file: filepath,
})
await Format.file(filepath)
Bus.publish(File.Event.Edited, { file: filepath })
await Bus.publish(FileWatcher.Event.Updated, {
file: filepath,
event: exists ? "change" : "add",

View File

@@ -15,7 +15,7 @@ import { Git } from "@/git"
import { Effect, FileSystem, Layer, Path, Scope, ServiceMap, Stream } from "effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import { NodeFileSystem, NodePath } from "@effect/platform-node"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
export namespace Worktree {
@@ -576,7 +576,7 @@ export namespace Worktree {
Layer.provide(NodeFileSystem.layer),
Layer.provide(NodePath.layer),
)
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function makeWorktreeInfo(name?: string) {
return runPromise((svc) => svc.makeWorktreeInfo(name))

View File

@@ -0,0 +1,164 @@
import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
import { describe, expect } from "bun:test"
import { Deferred, Effect, Layer, Stream } from "effect"
import z from "zod"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { Instance } from "../../src/project/instance"
import { provideInstance, provideTmpdirInstance, tmpdirScoped } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
const TestEvent = {
Ping: BusEvent.define("test.effect.ping", z.object({ value: z.number() })),
Pong: BusEvent.define("test.effect.pong", z.object({ message: z.string() })),
}
const node = NodeChildProcessSpawner.layer.pipe(
Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)),
)
const live = Layer.mergeAll(Bus.layer, node)
const it = testEffect(live)
describe("Bus (Effect-native)", () => {
it.effect("publish + subscribe stream delivers events", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const received: number[] = []
const done = yield* Deferred.make<void>()
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
received.push(evt.properties.value)
if (received.length === 2) Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* bus.publish(TestEvent.Ping, { value: 2 })
yield* Deferred.await(done)
expect(received).toEqual([1, 2])
}),
),
)
it.effect("subscribe filters by event type", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const pings: number[] = []
const done = yield* Deferred.make<void>()
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
pings.push(evt.properties.value)
Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Pong, { message: "ignored" })
yield* bus.publish(TestEvent.Ping, { value: 42 })
yield* Deferred.await(done)
expect(pings).toEqual([42])
}),
),
)
it.effect("subscribeAll receives all types", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const types: string[] = []
const done = yield* Deferred.make<void>()
yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
Effect.sync(() => {
types.push(evt.type)
if (types.length === 2) Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* bus.publish(TestEvent.Pong, { message: "hi" })
yield* Deferred.await(done)
expect(types).toContain("test.effect.ping")
expect(types).toContain("test.effect.pong")
}),
),
)
it.effect("multiple subscribers each receive the event", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const a: number[] = []
const b: number[] = []
const doneA = yield* Deferred.make<void>()
const doneB = yield* Deferred.make<void>()
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
a.push(evt.properties.value)
Deferred.doneUnsafe(doneA, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
b.push(evt.properties.value)
Deferred.doneUnsafe(doneB, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 99 })
yield* Deferred.await(doneA)
yield* Deferred.await(doneB)
expect(a).toEqual([99])
expect(b).toEqual([99])
}),
),
)
it.effect("subscribeAll stream sees InstanceDisposed on disposal", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped()
const types: string[] = []
const seen = yield* Deferred.make<void>()
const disposed = yield* Deferred.make<void>()
// Set up subscriber inside the instance
yield* Effect.gen(function* () {
const bus = yield* Bus.Service
yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
Effect.sync(() => {
types.push(evt.type)
if (evt.type === TestEvent.Ping.type) Deferred.doneUnsafe(seen, Effect.void)
if (evt.type === Bus.InstanceDisposed.type) Deferred.doneUnsafe(disposed, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* Deferred.await(seen)
}).pipe(provideInstance(dir))
// Dispose from OUTSIDE the instance scope
yield* Effect.promise(() => Instance.disposeAll())
yield* Deferred.await(disposed).pipe(Effect.timeout("2 seconds"))
expect(types).toContain("test.effect.ping")
expect(types).toContain(Bus.InstanceDisposed.type)
}),
)
})

View File

@@ -0,0 +1,87 @@
import { afterEach, describe, expect, test } from "bun:test"
import z from "zod"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { Instance } from "../../src/project/instance"
import { tmpdir } from "../fixture/fixture"
const TestEvent = BusEvent.define("test.integration", z.object({ value: z.number() }))
function withInstance(directory: string, fn: () => Promise<void>) {
return Instance.provide({ directory, fn })
}
describe("Bus integration: acquireRelease subscriber pattern", () => {
afterEach(() => Instance.disposeAll())
test("subscriber via callback facade receives events and cleans up on unsub", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
const unsub = Bus.subscribe(TestEvent, (evt) => {
received.push(evt.properties.value)
})
await Bun.sleep(10)
await Bus.publish(TestEvent, { value: 1 })
await Bus.publish(TestEvent, { value: 2 })
await Bun.sleep(10)
expect(received).toEqual([1, 2])
unsub()
await Bun.sleep(10)
await Bus.publish(TestEvent, { value: 3 })
await Bun.sleep(10)
expect(received).toEqual([1, 2])
})
})
test("subscribeAll receives events from multiple types", async () => {
await using tmp = await tmpdir()
const received: Array<{ type: string; value?: number }> = []
const OtherEvent = BusEvent.define("test.other", z.object({ value: z.number() }))
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
received.push({ type: evt.type, value: evt.properties.value })
})
await Bun.sleep(10)
await Bus.publish(TestEvent, { value: 10 })
await Bus.publish(OtherEvent, { value: 20 })
await Bun.sleep(10)
})
expect(received).toEqual([
{ type: "test.integration", value: 10 },
{ type: "test.other", value: 20 },
])
})
test("subscriber cleanup on instance disposal interrupts the stream", async () => {
await using tmp = await tmpdir()
const received: number[] = []
let disposed = false
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
if (evt.type === Bus.InstanceDisposed.type) {
disposed = true
return
}
received.push(evt.properties.value)
})
await Bun.sleep(10)
await Bus.publish(TestEvent, { value: 1 })
await Bun.sleep(10)
})
await Instance.disposeAll()
await Bun.sleep(50)
expect(received).toEqual([1])
expect(disposed).toBe(true)
})
})

View File

@@ -0,0 +1,219 @@
import { afterEach, describe, expect, test } from "bun:test"
import z from "zod"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { Instance } from "../../src/project/instance"
import { tmpdir } from "../fixture/fixture"
const TestEvent = {
Ping: BusEvent.define("test.ping", z.object({ value: z.number() })),
Pong: BusEvent.define("test.pong", z.object({ message: z.string() })),
}
function withInstance(directory: string, fn: () => Promise<void>) {
return Instance.provide({ directory, fn })
}
describe("Bus", () => {
afterEach(() => Instance.disposeAll())
describe("publish + subscribe", () => {
test("subscriber is live immediately after subscribe returns", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
})
await Bus.publish(TestEvent.Ping, { value: 42 })
await Bun.sleep(10)
})
expect(received).toEqual([42])
})
test("subscriber receives matching events", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
})
// Give the subscriber fiber time to start consuming
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 42 })
await Bus.publish(TestEvent.Ping, { value: 99 })
// Give subscriber time to process
await Bun.sleep(10)
})
expect(received).toEqual([42, 99])
})
test("subscriber does not receive events of other types", async () => {
await using tmp = await tmpdir()
const pings: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
pings.push(evt.properties.value)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Pong, { message: "hello" })
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
})
expect(pings).toEqual([1])
})
test("publish with no subscribers does not throw", async () => {
await using tmp = await tmpdir()
await withInstance(tmp.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 1 })
})
})
})
describe("unsubscribe", () => {
test("unsubscribe stops delivery", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
const unsub = Bus.subscribe(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
unsub()
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 2 })
await Bun.sleep(10)
})
expect(received).toEqual([1])
})
})
describe("subscribeAll", () => {
test("subscribeAll is live immediately after subscribe returns", async () => {
await using tmp = await tmpdir()
const received: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
received.push(evt.type)
})
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
})
expect(received).toEqual(["test.ping"])
})
test("receives all event types", async () => {
await using tmp = await tmpdir()
const received: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
received.push(evt.type)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bus.publish(TestEvent.Pong, { message: "hi" })
await Bun.sleep(10)
})
expect(received).toContain("test.ping")
expect(received).toContain("test.pong")
})
})
describe("multiple subscribers", () => {
test("all subscribers for same event type are called", async () => {
await using tmp = await tmpdir()
const a: number[] = []
const b: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
a.push(evt.properties.value)
})
Bus.subscribe(TestEvent.Ping, (evt) => {
b.push(evt.properties.value)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 7 })
await Bun.sleep(10)
})
expect(a).toEqual([7])
expect(b).toEqual([7])
})
})
describe("instance isolation", () => {
test("events in one directory do not reach subscribers in another", async () => {
await using tmpA = await tmpdir()
await using tmpB = await tmpdir()
const receivedA: number[] = []
const receivedB: number[] = []
await withInstance(tmpA.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
receivedA.push(evt.properties.value)
})
await Bun.sleep(10)
})
await withInstance(tmpB.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
receivedB.push(evt.properties.value)
})
await Bun.sleep(10)
})
await withInstance(tmpA.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
})
await withInstance(tmpB.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 2 })
await Bun.sleep(10)
})
expect(receivedA).toEqual([1])
expect(receivedB).toEqual([2])
})
})
describe("instance disposal", () => {
test("InstanceDisposed is delivered to wildcard subscribers before stream ends", async () => {
await using tmp = await tmpdir()
const received: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
received.push(evt.type)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
})
// Instance.disposeAll triggers the finalizer which publishes InstanceDisposed
await Instance.disposeAll()
await Bun.sleep(50)
expect(received).toContain("test.ping")
expect(received).toContain(Bus.InstanceDisposed.type)
})
})
})

View File

@@ -34,7 +34,7 @@ async function check(map: (dir: string) => string) {
await using tmp = await tmpdir({ git: true, config: { snapshot: true } })
const prev = Global.Path.config
;(Global.Path as { config: string }).config = globalTmp.path
await Config.invalidate()
Config.global.reset()
try {
await writeConfig(globalTmp.path, {
$schema: "https://opencode.ai/config.json",
@@ -52,7 +52,7 @@ async function check(map: (dir: string) => string) {
} finally {
await Instance.disposeAll()
;(Global.Path as { config: string }).config = prev
await Config.invalidate()
Config.global.reset()
}
}

View File

@@ -1,10 +1,10 @@
import { expect, test } from "bun:test"
import { Effect, Layer, ServiceMap } from "effect"
import { makeRunPromise } from "../../src/effect/run-service"
import { makeRuntime } from "../../src/effect/run-service"
class Shared extends ServiceMap.Service<Shared, { readonly id: number }>()("@test/Shared") {}
test("makeRunPromise shares dependent layers through the shared memo map", async () => {
test("makeRuntime shares dependent layers through the shared memo map", async () => {
let n = 0
const shared = Layer.effect(
@@ -37,8 +37,8 @@ test("makeRunPromise shares dependent layers through the shared memo map", async
}),
).pipe(Layer.provide(shared))
const runOne = makeRunPromise(One, one)
const runTwo = makeRunPromise(Two, two)
const { runPromise: runOne } = makeRuntime(One, one)
const { runPromise: runTwo } = makeRuntime(Two, two)
expect(await runOne((svc) => svc.get())).toBe(1)
expect(await runTwo((svc) => svc.get())).toBe(1)

View File

@@ -2,9 +2,8 @@ import { $ } from "bun"
import { afterEach, describe, expect, test } from "bun:test"
import fs from "fs/promises"
import path from "path"
import { Deferred, Effect, Option } from "effect"
import { ConfigProvider, Deferred, Effect, Layer, ManagedRuntime, Option } from "effect"
import { tmpdir } from "../fixture/fixture"
import { watcherConfigLayer, withServices } from "../fixture/instance"
import { Bus } from "../../src/bus"
import { FileWatcher } from "../../src/file/watcher"
import { Instance } from "../../src/project/instance"
@@ -16,20 +15,33 @@ const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? desc
// Helpers
// ---------------------------------------------------------------------------
const watcherConfigLayer = ConfigProvider.layer(
ConfigProvider.fromUnknown({
OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
}),
)
type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
/** Run `body` with a live FileWatcher service. */
function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
return withServices(
return Instance.provide({
directory,
FileWatcher.layer,
async (rt) => {
await rt.runPromise(FileWatcher.Service.use((s) => s.init()))
await Effect.runPromise(ready(directory))
await Effect.runPromise(body)
fn: async () => {
const layer: Layer.Layer<FileWatcher.Service, never, never> = FileWatcher.layer.pipe(
Layer.provide(watcherConfigLayer),
)
const rt = ManagedRuntime.make(layer)
try {
await rt.runPromise(FileWatcher.Service.use((s) => s.init()))
await Effect.runPromise(ready(directory))
await Effect.runPromise(body)
} finally {
await rt.dispose()
}
},
{ provide: [watcherConfigLayer] },
)
})
}
function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {

View File

@@ -2,7 +2,10 @@ import { $ } from "bun"
import * as fs from "fs/promises"
import os from "os"
import path from "path"
import { Effect, FileSystem, ServiceMap } from "effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import type { Config } from "../../src/config/config"
import { Instance } from "../../src/project/instance"
// Strip null bytes from paths (defensive fix for CI environment issues)
function sanitizePath(p: string): string {
@@ -71,3 +74,68 @@ export async function tmpdir<T>(options?: TmpDirOptions<T>) {
}
return result
}
/** Effectful scoped tmpdir. Cleaned up when the scope closes. Make sure these stay in sync */
export function tmpdirScoped(options?: { git?: boolean; config?: Partial<Config.Info> }) {
return Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
const dir = yield* fs.makeTempDirectoryScoped({ prefix: "opencode-test-" })
const git = (...args: string[]) =>
spawner.spawn(ChildProcess.make("git", args, { cwd: dir })).pipe(Effect.flatMap((handle) => handle.exitCode))
if (options?.git) {
yield* git("init")
yield* git("config", "core.fsmonitor", "false")
yield* git("config", "user.email", "test@opencode.test")
yield* git("config", "user.name", "Test")
yield* git("commit", "--allow-empty", "-m", "root commit")
}
if (options?.config) {
yield* fs.writeFileString(
path.join(dir, "opencode.json"),
JSON.stringify({ $schema: "https://opencode.ai/config.json", ...options.config }),
)
}
return dir
})
}
export const provideInstance =
(directory: string) =>
<A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> =>
Effect.servicesWith((services: ServiceMap.ServiceMap<R>) =>
Effect.promise<A>(async () =>
Instance.provide({
directory,
fn: () => Effect.runPromiseWith(services)(self),
}),
),
)
export function provideTmpdirInstance<A, E, R>(
self: (path: string) => Effect.Effect<A, E, R>,
options?: { git?: boolean; config?: Partial<Config.Info> },
) {
return Effect.gen(function* () {
const path = yield* tmpdirScoped(options)
let provided = false
yield* Effect.addFinalizer(() =>
provided
? Effect.promise(() =>
Instance.provide({
directory: path,
fn: () => Instance.dispose(),
}),
).pipe(Effect.ignore)
: Effect.void,
)
provided = true
return yield* self(path).pipe(provideInstance(path))
})
}

View File

@@ -1,42 +0,0 @@
import { ConfigProvider, Layer, ManagedRuntime } from "effect"
import { Instance } from "../../src/project/instance"
/** ConfigProvider that enables the experimental file watcher. */
export const watcherConfigLayer = ConfigProvider.layer(
ConfigProvider.fromUnknown({
OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
}),
)
/**
* Boot an Instance with the given service layers and run `body` with
* the ManagedRuntime. Cleanup is automatic — the runtime is disposed
* and Instance context is torn down when `body` completes.
*
* Pass extra layers via `options.provide` (e.g. ConfigProvider.layer).
*/
export function withServices<S>(
directory: string,
layer: Layer.Layer<S>,
body: (rt: ManagedRuntime.ManagedRuntime<S, never>) => Promise<void>,
options?: { provide?: Layer.Layer<never>[] },
) {
return Instance.provide({
directory,
fn: async () => {
let resolved: Layer.Layer<S> = layer
if (options?.provide) {
for (const l of options.provide) {
resolved = resolved.pipe(Layer.provide(l)) as any
}
}
const rt = ManagedRuntime.make(resolved)
try {
await body(rt)
} finally {
await rt.dispose()
}
},
})
}

View File

@@ -1,172 +1,182 @@
import { Effect } from "effect"
import { afterEach, describe, expect, test } from "bun:test"
import { tmpdir } from "../fixture/fixture"
import { withServices } from "../fixture/instance"
import { Bus } from "../../src/bus"
import { File } from "../../src/file"
import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
import { describe, expect } from "bun:test"
import { Effect, Layer } from "effect"
import { provideTmpdirInstance } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
import { Format } from "../../src/format"
import * as Formatter from "../../src/format/formatter"
import { Instance } from "../../src/project/instance"
const node = NodeChildProcessSpawner.layer.pipe(
Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)),
)
const it = testEffect(Layer.mergeAll(Format.layer, node))
describe("Format", () => {
afterEach(async () => {
await Instance.disposeAll()
})
it.effect("status() returns built-in formatters when no config overrides", () =>
provideTmpdirInstance(() =>
Format.Service.use((fmt) =>
Effect.gen(function* () {
const statuses = yield* fmt.status()
expect(Array.isArray(statuses)).toBe(true)
expect(statuses.length).toBeGreaterThan(0)
test("status() returns built-in formatters when no config overrides", async () => {
await using tmp = await tmpdir()
for (const item of statuses) {
expect(typeof item.name).toBe("string")
expect(Array.isArray(item.extensions)).toBe(true)
expect(typeof item.enabled).toBe("boolean")
}
await withServices(tmp.path, Format.layer, async (rt) => {
const statuses = await rt.runPromise(Format.Service.use((s) => s.status()))
expect(Array.isArray(statuses)).toBe(true)
expect(statuses.length).toBeGreaterThan(0)
const gofmt = statuses.find((item) => item.name === "gofmt")
expect(gofmt).toBeDefined()
expect(gofmt!.extensions).toContain(".go")
}),
),
),
)
for (const s of statuses) {
expect(typeof s.name).toBe("string")
expect(Array.isArray(s.extensions)).toBe(true)
expect(typeof s.enabled).toBe("boolean")
}
it.effect("status() returns empty list when formatter is disabled", () =>
provideTmpdirInstance(
() =>
Format.Service.use((fmt) =>
Effect.gen(function* () {
expect(yield* fmt.status()).toEqual([])
}),
),
{ config: { formatter: false } },
),
)
const gofmt = statuses.find((s) => s.name === "gofmt")
expect(gofmt).toBeDefined()
expect(gofmt!.extensions).toContain(".go")
})
})
test("status() returns empty list when formatter is disabled", async () => {
await using tmp = await tmpdir({
config: { formatter: false },
})
await withServices(tmp.path, Format.layer, async (rt) => {
const statuses = await rt.runPromise(Format.Service.use((s) => s.status()))
expect(statuses).toEqual([])
})
})
test("status() excludes formatters marked as disabled in config", async () => {
await using tmp = await tmpdir({
config: {
formatter: {
gofmt: { disabled: true },
it.effect("status() excludes formatters marked as disabled in config", () =>
provideTmpdirInstance(
() =>
Format.Service.use((fmt) =>
Effect.gen(function* () {
const statuses = yield* fmt.status()
const gofmt = statuses.find((item) => item.name === "gofmt")
expect(gofmt).toBeUndefined()
}),
),
{
config: {
formatter: {
gofmt: { disabled: true },
},
},
},
})
),
)
await withServices(tmp.path, Format.layer, async (rt) => {
const statuses = await rt.runPromise(Format.Service.use((s) => s.status()))
const gofmt = statuses.find((s) => s.name === "gofmt")
expect(gofmt).toBeUndefined()
})
})
it.effect("service initializes without error", () =>
provideTmpdirInstance(() => Format.Service.use(() => Effect.void)),
)
test("service initializes without error", async () => {
await using tmp = await tmpdir()
await withServices(tmp.path, Format.layer, async (rt) => {
await rt.runPromise(Format.Service.use(() => Effect.void))
})
})
test("status() initializes formatter state per directory", async () => {
await using off = await tmpdir({
config: { formatter: false },
})
await using on = await tmpdir()
const a = await Instance.provide({
directory: off.path,
fn: () => Format.status(),
})
const b = await Instance.provide({
directory: on.path,
fn: () => Format.status(),
})
expect(a).toEqual([])
expect(b.length).toBeGreaterThan(0)
})
test("runs enabled checks for matching formatters in parallel", async () => {
await using tmp = await tmpdir()
const file = `${tmp.path}/test.parallel`
await Bun.write(file, "x")
const one = {
extensions: Formatter.gofmt.extensions,
enabled: Formatter.gofmt.enabled,
command: Formatter.gofmt.command,
}
const two = {
extensions: Formatter.mix.extensions,
enabled: Formatter.mix.enabled,
command: Formatter.mix.command,
}
let active = 0
let max = 0
Formatter.gofmt.extensions = [".parallel"]
Formatter.mix.extensions = [".parallel"]
Formatter.gofmt.command = ["sh", "-c", "true"]
Formatter.mix.command = ["sh", "-c", "true"]
Formatter.gofmt.enabled = async () => {
active++
max = Math.max(max, active)
await Bun.sleep(20)
active--
return true
}
Formatter.mix.enabled = async () => {
active++
max = Math.max(max, active)
await Bun.sleep(20)
active--
return true
}
try {
await withServices(tmp.path, Format.layer, async (rt) => {
await rt.runPromise(Format.Service.use((s) => s.init()))
await Bus.publish(File.Event.Edited, { file })
it.effect("status() initializes formatter state per directory", () =>
Effect.gen(function* () {
const a = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status()), {
config: { formatter: false },
})
} finally {
Formatter.gofmt.extensions = one.extensions
Formatter.gofmt.enabled = one.enabled
Formatter.gofmt.command = one.command
Formatter.mix.extensions = two.extensions
Formatter.mix.enabled = two.enabled
Formatter.mix.command = two.command
}
const b = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status()))
expect(max).toBe(2)
})
expect(a).toEqual([])
expect(b.length).toBeGreaterThan(0)
}),
)
test("runs matching formatters sequentially for the same file", async () => {
await using tmp = await tmpdir({
config: {
formatter: {
first: {
command: ["sh", "-c", 'sleep 0.05; v=$(cat "$1"); printf \'%sA\' "$v" > "$1"', "sh", "$FILE"],
extensions: [".seq"],
},
second: {
command: ["sh", "-c", 'v=$(cat "$1"); printf \'%sB\' "$v" > "$1"', "sh", "$FILE"],
extensions: [".seq"],
it.effect("runs enabled checks for matching formatters in parallel", () =>
provideTmpdirInstance((path) =>
Effect.gen(function* () {
const file = `${path}/test.parallel`
yield* Effect.promise(() => Bun.write(file, "x"))
const one = {
extensions: Formatter.gofmt.extensions,
enabled: Formatter.gofmt.enabled,
command: Formatter.gofmt.command,
}
const two = {
extensions: Formatter.mix.extensions,
enabled: Formatter.mix.enabled,
command: Formatter.mix.command,
}
let active = 0
let max = 0
yield* Effect.acquireUseRelease(
Effect.sync(() => {
Formatter.gofmt.extensions = [".parallel"]
Formatter.mix.extensions = [".parallel"]
Formatter.gofmt.command = ["sh", "-c", "true"]
Formatter.mix.command = ["sh", "-c", "true"]
Formatter.gofmt.enabled = async () => {
active++
max = Math.max(max, active)
await Bun.sleep(20)
active--
return true
}
Formatter.mix.enabled = async () => {
active++
max = Math.max(max, active)
await Bun.sleep(20)
active--
return true
}
}),
() =>
Format.Service.use((fmt) =>
Effect.gen(function* () {
yield* fmt.init()
yield* fmt.file(file)
}),
),
() =>
Effect.sync(() => {
Formatter.gofmt.extensions = one.extensions
Formatter.gofmt.enabled = one.enabled
Formatter.gofmt.command = one.command
Formatter.mix.extensions = two.extensions
Formatter.mix.enabled = two.enabled
Formatter.mix.command = two.command
}),
)
expect(max).toBe(2)
}),
),
)
it.effect("runs matching formatters sequentially for the same file", () =>
provideTmpdirInstance(
(path) =>
Effect.gen(function* () {
const file = `${path}/test.seq`
yield* Effect.promise(() => Bun.write(file, "x"))
yield* Format.Service.use((fmt) =>
Effect.gen(function* () {
yield* fmt.init()
yield* fmt.file(file)
}),
)
expect(yield* Effect.promise(() => Bun.file(file).text())).toBe("xAB")
}),
{
config: {
formatter: {
first: {
command: ["sh", "-c", 'sleep 0.05; v=$(cat "$1"); printf \'%sA\' "$v" > "$1"', "sh", "$FILE"],
extensions: [".seq"],
},
second: {
command: ["sh", "-c", 'v=$(cat "$1"); printf \'%sB\' "$v" > "$1"', "sh", "$FILE"],
extensions: [".seq"],
},
},
},
},
})
const file = `${tmp.path}/test.seq`
await Bun.write(file, "x")
await withServices(tmp.path, Format.layer, async (rt) => {
await rt.runPromise(Format.Service.use((s) => s.init()))
await Bus.publish(File.Event.Edited, { file })
})
expect(await Bun.file(file).text()).toBe("xAB")
})
),
)
})

View File

@@ -0,0 +1,151 @@
import { describe, expect, test, spyOn, beforeEach } from "bun:test"
import path from "path"
import * as Lsp from "../../src/lsp/index"
import { LSPServer } from "../../src/lsp/server"
import { Instance } from "../../src/project/instance"
import { tmpdir } from "../fixture/fixture"
function withInstance(fn: (dir: string) => Promise<void>) {
return async () => {
await using tmp = await tmpdir()
try {
await Instance.provide({
directory: tmp.path,
fn: () => fn(tmp.path),
})
} finally {
await Instance.disposeAll()
}
}
}
describe("LSP service lifecycle", () => {
let spawnSpy: ReturnType<typeof spyOn>
beforeEach(() => {
spawnSpy = spyOn(LSPServer.Typescript, "spawn").mockResolvedValue(undefined)
})
test(
"init() completes without error",
withInstance(async () => {
await Lsp.LSP.init()
}),
)
test(
"status() returns empty array initially",
withInstance(async () => {
const result = await Lsp.LSP.status()
expect(Array.isArray(result)).toBe(true)
expect(result.length).toBe(0)
spawnSpy.mockRestore()
}),
)
test(
"diagnostics() returns empty object initially",
withInstance(async () => {
const result = await Lsp.LSP.diagnostics()
expect(typeof result).toBe("object")
expect(Object.keys(result).length).toBe(0)
spawnSpy.mockRestore()
}),
)
test(
"hasClients() returns true for .ts files in instance",
withInstance(async (dir) => {
const result = await Lsp.LSP.hasClients(path.join(dir, "test.ts"))
expect(result).toBe(true)
spawnSpy.mockRestore()
}),
)
test(
"hasClients() returns false for files outside instance",
withInstance(async (dir) => {
const result = await Lsp.LSP.hasClients(path.join(dir, "..", "outside.ts"))
// hasClients checks servers but doesn't check containsPath — getClients does
// So hasClients may return true even for outside files (it checks extension + root)
// The guard is in getClients, not hasClients
expect(typeof result).toBe("boolean")
spawnSpy.mockRestore()
}),
)
test(
"workspaceSymbol() returns empty array with no clients",
withInstance(async () => {
const result = await Lsp.LSP.workspaceSymbol("test")
expect(Array.isArray(result)).toBe(true)
expect(result.length).toBe(0)
spawnSpy.mockRestore()
}),
)
test(
"definition() returns empty array for unknown file",
withInstance(async (dir) => {
const result = await Lsp.LSP.definition({
file: path.join(dir, "nonexistent.ts"),
line: 0,
character: 0,
})
expect(Array.isArray(result)).toBe(true)
spawnSpy.mockRestore()
}),
)
test(
"references() returns empty array for unknown file",
withInstance(async (dir) => {
const result = await Lsp.LSP.references({
file: path.join(dir, "nonexistent.ts"),
line: 0,
character: 0,
})
expect(Array.isArray(result)).toBe(true)
spawnSpy.mockRestore()
}),
)
test(
"multiple init() calls are idempotent",
withInstance(async () => {
await Lsp.LSP.init()
await Lsp.LSP.init()
await Lsp.LSP.init()
// Should not throw or create duplicate state
spawnSpy.mockRestore()
}),
)
})
describe("LSP.Diagnostic", () => {
test("pretty() formats error diagnostic", () => {
const result = Lsp.LSP.Diagnostic.pretty({
range: { start: { line: 9, character: 4 }, end: { line: 9, character: 10 } },
message: "Type 'string' is not assignable to type 'number'",
severity: 1,
} as any)
expect(result).toBe("ERROR [10:5] Type 'string' is not assignable to type 'number'")
})
test("pretty() formats warning diagnostic", () => {
const result = Lsp.LSP.Diagnostic.pretty({
range: { start: { line: 0, character: 0 }, end: { line: 0, character: 5 } },
message: "Unused variable",
severity: 2,
} as any)
expect(result).toBe("WARN [1:1] Unused variable")
})
test("pretty() defaults to ERROR when no severity", () => {
const result = Lsp.LSP.Diagnostic.pretty({
range: { start: { line: 0, character: 0 }, end: { line: 0, character: 1 } },
message: "Something wrong",
} as any)
expect(result).toBe("ERROR [1:1] Something wrong")
})
})

View File

@@ -0,0 +1,660 @@
import { test, expect, mock, beforeEach } from "bun:test"
// --- Mock infrastructure ---
// Per-client state for controlling mock behavior
interface MockClientState {
tools: Array<{ name: string; description?: string; inputSchema: object }>
listToolsCalls: number
listToolsShouldFail: boolean
listToolsError: string
listPromptsShouldFail: boolean
listResourcesShouldFail: boolean
prompts: Array<{ name: string; description?: string }>
resources: Array<{ name: string; uri: string; description?: string }>
closed: boolean
notificationHandlers: Map<unknown, (...args: any[]) => any>
}
const clientStates = new Map<string, MockClientState>()
let lastCreatedClientName: string | undefined
let connectShouldFail = false
let connectError = "Mock transport cannot connect"
// Tracks how many Client instances were created (detects leaks)
let clientCreateCount = 0
function getOrCreateClientState(name?: string): MockClientState {
const key = name ?? "default"
let state = clientStates.get(key)
if (!state) {
state = {
tools: [{ name: "test_tool", description: "A test tool", inputSchema: { type: "object", properties: {} } }],
listToolsCalls: 0,
listToolsShouldFail: false,
listToolsError: "listTools failed",
listPromptsShouldFail: false,
listResourcesShouldFail: false,
prompts: [],
resources: [],
closed: false,
notificationHandlers: new Map(),
}
clientStates.set(key, state)
}
return state
}
// Mock transport that succeeds or fails based on connectShouldFail
class MockStdioTransport {
stderr: null = null
pid = 12345
constructor(_opts: any) {}
async start() {
if (connectShouldFail) throw new Error(connectError)
}
async close() {}
}
class MockStreamableHTTP {
constructor(_url: URL, _opts?: any) {}
async start() {
if (connectShouldFail) throw new Error(connectError)
}
async close() {}
async finishAuth() {}
}
class MockSSE {
constructor(_url: URL, _opts?: any) {}
async start() {
throw new Error("SSE fallback - not used in these tests")
}
async close() {}
}
mock.module("@modelcontextprotocol/sdk/client/stdio.js", () => ({
StdioClientTransport: MockStdioTransport,
}))
mock.module("@modelcontextprotocol/sdk/client/streamableHttp.js", () => ({
StreamableHTTPClientTransport: MockStreamableHTTP,
}))
mock.module("@modelcontextprotocol/sdk/client/sse.js", () => ({
SSEClientTransport: MockSSE,
}))
mock.module("@modelcontextprotocol/sdk/client/auth.js", () => ({
UnauthorizedError: class extends Error {
constructor() {
super("Unauthorized")
}
},
}))
// Mock Client that delegates to per-name MockClientState
mock.module("@modelcontextprotocol/sdk/client/index.js", () => ({
Client: class MockClient {
_state!: MockClientState
transport: any
constructor(_opts: any) {
clientCreateCount++
}
async connect(transport: { start: () => Promise<void> }) {
this.transport = transport
await transport.start()
// After successful connect, bind to the last-created client name
this._state = getOrCreateClientState(lastCreatedClientName)
}
setNotificationHandler(schema: unknown, handler: (...args: any[]) => any) {
this._state?.notificationHandlers.set(schema, handler)
}
async listTools() {
if (this._state) this._state.listToolsCalls++
if (this._state?.listToolsShouldFail) {
throw new Error(this._state.listToolsError)
}
return { tools: this._state?.tools ?? [] }
}
async listPrompts() {
if (this._state?.listPromptsShouldFail) {
throw new Error("listPrompts failed")
}
return { prompts: this._state?.prompts ?? [] }
}
async listResources() {
if (this._state?.listResourcesShouldFail) {
throw new Error("listResources failed")
}
return { resources: this._state?.resources ?? [] }
}
async close() {
if (this._state) this._state.closed = true
}
},
}))
beforeEach(() => {
clientStates.clear()
lastCreatedClientName = undefined
connectShouldFail = false
connectError = "Mock transport cannot connect"
clientCreateCount = 0
})
// Import after mocks
const { MCP } = await import("../../src/mcp/index")
const { Instance } = await import("../../src/project/instance")
const { tmpdir } = await import("../fixture/fixture")
// --- Helper ---
function withInstance(config: Record<string, any>, fn: () => Promise<void>) {
return async () => {
await using tmp = await tmpdir({
init: async (dir) => {
await Bun.write(
`${dir}/opencode.json`,
JSON.stringify({
$schema: "https://opencode.ai/config.json",
mcp: config,
}),
)
},
})
await Instance.provide({
directory: tmp.path,
fn: async () => {
await fn()
// dispose instance to clean up state between tests
await Instance.dispose()
},
})
}
}
// ========================================================================
// Test: tools() are cached after connect
// ========================================================================
test(
"tools() reuses cached tool definitions after connect",
withInstance({}, async () => {
lastCreatedClientName = "my-server"
const serverState = getOrCreateClientState("my-server")
serverState.tools = [
{ name: "do_thing", description: "does a thing", inputSchema: { type: "object", properties: {} } },
]
// First: add the server successfully
const addResult = await MCP.add("my-server", {
type: "local",
command: ["echo", "test"],
})
expect((addResult.status as any)["my-server"]?.status ?? (addResult.status as any).status).toBe("connected")
expect(serverState.listToolsCalls).toBe(1)
const toolsA = await MCP.tools()
const toolsB = await MCP.tools()
expect(Object.keys(toolsA).length).toBeGreaterThan(0)
expect(Object.keys(toolsB).length).toBeGreaterThan(0)
expect(serverState.listToolsCalls).toBe(1)
}),
)
// ========================================================================
// Test: tool change notifications refresh the cache
// ========================================================================
test(
"tool change notifications refresh cached tool definitions",
withInstance({}, async () => {
lastCreatedClientName = "status-server"
const serverState = getOrCreateClientState("status-server")
await MCP.add("status-server", {
type: "local",
command: ["echo", "test"],
})
const before = await MCP.tools()
expect(Object.keys(before).some((key) => key.includes("test_tool"))).toBe(true)
expect(serverState.listToolsCalls).toBe(1)
serverState.tools = [{ name: "next_tool", description: "next", inputSchema: { type: "object", properties: {} } }]
const handler = Array.from(serverState.notificationHandlers.values())[0]
expect(handler).toBeDefined()
await handler?.()
const after = await MCP.tools()
expect(Object.keys(after).some((key) => key.includes("next_tool"))).toBe(true)
expect(Object.keys(after).some((key) => key.includes("test_tool"))).toBe(false)
expect(serverState.listToolsCalls).toBe(2)
}),
)
// ========================================================================
// Test: connect() / disconnect() lifecycle
// ========================================================================
test(
"disconnect sets status to disabled and removes client",
withInstance(
{
"disc-server": {
type: "local",
command: ["echo", "test"],
},
},
async () => {
lastCreatedClientName = "disc-server"
getOrCreateClientState("disc-server")
await MCP.add("disc-server", {
type: "local",
command: ["echo", "test"],
})
const statusBefore = await MCP.status()
expect(statusBefore["disc-server"]?.status).toBe("connected")
await MCP.disconnect("disc-server")
const statusAfter = await MCP.status()
expect(statusAfter["disc-server"]?.status).toBe("disabled")
// Tools should be empty after disconnect
const tools = await MCP.tools()
const serverTools = Object.keys(tools).filter((k) => k.startsWith("disc-server"))
expect(serverTools.length).toBe(0)
},
),
)
test(
"connect() after disconnect() re-establishes the server",
withInstance(
{
"reconn-server": {
type: "local",
command: ["echo", "test"],
},
},
async () => {
lastCreatedClientName = "reconn-server"
const serverState = getOrCreateClientState("reconn-server")
serverState.tools = [{ name: "my_tool", description: "a tool", inputSchema: { type: "object", properties: {} } }]
await MCP.add("reconn-server", {
type: "local",
command: ["echo", "test"],
})
await MCP.disconnect("reconn-server")
expect((await MCP.status())["reconn-server"]?.status).toBe("disabled")
// Reconnect
await MCP.connect("reconn-server")
expect((await MCP.status())["reconn-server"]?.status).toBe("connected")
const tools = await MCP.tools()
expect(Object.keys(tools).some((k) => k.includes("my_tool"))).toBe(true)
},
),
)
// ========================================================================
// Test: add() closes existing client before replacing
// ========================================================================
test(
"add() closes the old client when replacing a server",
// Don't put the server in config — add it dynamically so we control
// exactly which client instance is "first" vs "second".
withInstance({}, async () => {
lastCreatedClientName = "replace-server"
const firstState = getOrCreateClientState("replace-server")
await MCP.add("replace-server", {
type: "local",
command: ["echo", "test"],
})
expect(firstState.closed).toBe(false)
// Create new state for second client
clientStates.delete("replace-server")
const secondState = getOrCreateClientState("replace-server")
// Re-add should close the first client
await MCP.add("replace-server", {
type: "local",
command: ["echo", "test"],
})
expect(firstState.closed).toBe(true)
expect(secondState.closed).toBe(false)
}),
)
// ========================================================================
// Test: state init with mixed success/failure
// ========================================================================
test(
"init connects available servers even when one fails",
withInstance(
{
"good-server": {
type: "local",
command: ["echo", "good"],
},
"bad-server": {
type: "local",
command: ["echo", "bad"],
},
},
async () => {
// Set up good server
const goodState = getOrCreateClientState("good-server")
goodState.tools = [{ name: "good_tool", description: "works", inputSchema: { type: "object", properties: {} } }]
// Set up bad server - will fail on listTools during create()
const badState = getOrCreateClientState("bad-server")
badState.listToolsShouldFail = true
// Add good server first
lastCreatedClientName = "good-server"
await MCP.add("good-server", {
type: "local",
command: ["echo", "good"],
})
// Add bad server - should fail but not affect good server
lastCreatedClientName = "bad-server"
await MCP.add("bad-server", {
type: "local",
command: ["echo", "bad"],
})
const status = await MCP.status()
expect(status["good-server"]?.status).toBe("connected")
expect(status["bad-server"]?.status).toBe("failed")
// Good server's tools should still be available
const tools = await MCP.tools()
expect(Object.keys(tools).some((k) => k.includes("good_tool"))).toBe(true)
},
),
)
// ========================================================================
// Test: disabled server via config
// ========================================================================
test(
"disabled server is marked as disabled without attempting connection",
withInstance(
{
"disabled-server": {
type: "local",
command: ["echo", "test"],
enabled: false,
},
},
async () => {
const countBefore = clientCreateCount
await MCP.add("disabled-server", {
type: "local",
command: ["echo", "test"],
enabled: false,
} as any)
// No client should have been created
expect(clientCreateCount).toBe(countBefore)
const status = await MCP.status()
expect(status["disabled-server"]?.status).toBe("disabled")
},
),
)
// ========================================================================
// Test: prompts() and resources()
// ========================================================================
test(
"prompts() returns prompts from connected servers",
withInstance(
{
"prompt-server": {
type: "local",
command: ["echo", "test"],
},
},
async () => {
lastCreatedClientName = "prompt-server"
const serverState = getOrCreateClientState("prompt-server")
serverState.prompts = [{ name: "my-prompt", description: "A test prompt" }]
await MCP.add("prompt-server", {
type: "local",
command: ["echo", "test"],
})
const prompts = await MCP.prompts()
expect(Object.keys(prompts).length).toBe(1)
const key = Object.keys(prompts)[0]
expect(key).toContain("prompt-server")
expect(key).toContain("my-prompt")
},
),
)
test(
"resources() returns resources from connected servers",
withInstance(
{
"resource-server": {
type: "local",
command: ["echo", "test"],
},
},
async () => {
lastCreatedClientName = "resource-server"
const serverState = getOrCreateClientState("resource-server")
serverState.resources = [{ name: "my-resource", uri: "file:///test.txt", description: "A test resource" }]
await MCP.add("resource-server", {
type: "local",
command: ["echo", "test"],
})
const resources = await MCP.resources()
expect(Object.keys(resources).length).toBe(1)
const key = Object.keys(resources)[0]
expect(key).toContain("resource-server")
expect(key).toContain("my-resource")
},
),
)
test(
"prompts() skips disconnected servers",
withInstance(
{
"prompt-disc-server": {
type: "local",
command: ["echo", "test"],
},
},
async () => {
lastCreatedClientName = "prompt-disc-server"
const serverState = getOrCreateClientState("prompt-disc-server")
serverState.prompts = [{ name: "hidden-prompt", description: "Should not appear" }]
await MCP.add("prompt-disc-server", {
type: "local",
command: ["echo", "test"],
})
await MCP.disconnect("prompt-disc-server")
const prompts = await MCP.prompts()
expect(Object.keys(prompts).length).toBe(0)
},
),
)
// ========================================================================
// Test: connect() on nonexistent server
// ========================================================================
test(
"connect() on nonexistent server does not throw",
withInstance({}, async () => {
// Should not throw
await MCP.connect("nonexistent")
const status = await MCP.status()
expect(status["nonexistent"]).toBeUndefined()
}),
)
// ========================================================================
// Test: disconnect() on nonexistent server
// ========================================================================
test(
"disconnect() on nonexistent server does not throw",
withInstance({}, async () => {
await MCP.disconnect("nonexistent")
// Should complete without error
}),
)
// ========================================================================
// Test: tools() with no MCP servers configured
// ========================================================================
test(
"tools() returns empty when no MCP servers are configured",
withInstance({}, async () => {
const tools = await MCP.tools()
expect(Object.keys(tools).length).toBe(0)
}),
)
// ========================================================================
// Test: connect failure during create()
// ========================================================================
test(
"server that fails to connect is marked as failed",
withInstance(
{
"fail-connect": {
type: "local",
command: ["echo", "test"],
},
},
async () => {
lastCreatedClientName = "fail-connect"
getOrCreateClientState("fail-connect")
connectShouldFail = true
connectError = "Connection refused"
await MCP.add("fail-connect", {
type: "local",
command: ["echo", "test"],
})
const status = await MCP.status()
expect(status["fail-connect"]?.status).toBe("failed")
if (status["fail-connect"]?.status === "failed") {
expect(status["fail-connect"].error).toContain("Connection refused")
}
// No tools should be available
const tools = await MCP.tools()
expect(Object.keys(tools).length).toBe(0)
},
),
)
// ========================================================================
// Bug #5: McpOAuthCallback.cancelPending uses wrong key
// ========================================================================
test("McpOAuthCallback.cancelPending is keyed by mcpName but pendingAuths uses oauthState", async () => {
const { McpOAuthCallback } = await import("../../src/mcp/oauth-callback")
// Register a pending auth with an oauthState key, associated to an mcpName
const oauthState = "abc123hexstate"
const callbackPromise = McpOAuthCallback.waitForCallback(oauthState, "my-mcp-server")
// cancelPending is called with mcpName — should find the entry via reverse index
McpOAuthCallback.cancelPending("my-mcp-server")
// The callback should still be pending because cancelPending looked up
// "my-mcp-server" in a map keyed by "abc123hexstate"
let resolved = false
let rejected = false
callbackPromise.then(() => (resolved = true)).catch(() => (rejected = true))
// Give it a tick
await new Promise((r) => setTimeout(r, 50))
// cancelPending("my-mcp-server") should have rejected the pending callback
expect(rejected).toBe(true)
await McpOAuthCallback.stop()
})
// ========================================================================
// Test: multiple tools from same server get correct name prefixes
// ========================================================================
test(
"tools() prefixes tool names with sanitized server name",
withInstance(
{
"my.special-server": {
type: "local",
command: ["echo", "test"],
},
},
async () => {
lastCreatedClientName = "my.special-server"
const serverState = getOrCreateClientState("my.special-server")
serverState.tools = [
{ name: "tool-a", description: "Tool A", inputSchema: { type: "object", properties: {} } },
{ name: "tool.b", description: "Tool B", inputSchema: { type: "object", properties: {} } },
]
await MCP.add("my.special-server", {
type: "local",
command: ["echo", "test"],
})
const tools = await MCP.tools()
const keys = Object.keys(tools)
// Server name dots should be replaced with underscores
expect(keys.some((k) => k.startsWith("my_special-server_"))).toBe(true)
// Tool name dots should be replaced with underscores
expect(keys.some((k) => k.endsWith("tool_b"))).toBe(true)
expect(keys.length).toBe(2)
},
),
)

View File

@@ -2,9 +2,7 @@ import { $ } from "bun"
import { afterEach, describe, expect, test } from "bun:test"
import fs from "fs/promises"
import path from "path"
import { Effect, Layer, ManagedRuntime } from "effect"
import { tmpdir } from "../fixture/fixture"
import { watcherConfigLayer, withServices } from "../fixture/instance"
import { FileWatcher } from "../../src/file/watcher"
import { Instance } from "../../src/project/instance"
import { GlobalBus } from "../../src/bus/global"
@@ -17,28 +15,26 @@ const describeVcs = FileWatcher.hasNativeBinding() && !process.env.CI ? describe
// Helpers
// ---------------------------------------------------------------------------
function withVcs(
directory: string,
body: (rt: ManagedRuntime.ManagedRuntime<FileWatcher.Service | Vcs.Service, never>) => Promise<void>,
) {
return withServices(
async function withVcs(directory: string, body: () => Promise<void>) {
return Instance.provide({
directory,
Layer.merge(FileWatcher.layer, Vcs.defaultLayer),
async (rt) => {
await rt.runPromise(FileWatcher.Service.use((s) => s.init()))
await rt.runPromise(Vcs.Service.use((s) => s.init()))
fn: async () => {
FileWatcher.init()
Vcs.init()
await Bun.sleep(500)
await body(rt)
await body()
},
{ provide: [watcherConfigLayer] },
)
})
}
function withVcsOnly(
directory: string,
body: (rt: ManagedRuntime.ManagedRuntime<Vcs.Service, never>) => Promise<void>,
) {
return withServices(directory, Vcs.defaultLayer, body)
function withVcsOnly(directory: string, body: () => Promise<void>) {
return Instance.provide({
directory,
fn: async () => {
Vcs.init()
await body()
},
})
}
type BranchEvent = { directory?: string; payload: { type: string; properties: { branch?: string } } }
@@ -82,8 +78,8 @@ describeVcs("Vcs", () => {
test("branch() returns current branch name", async () => {
await using tmp = await tmpdir({ git: true })
await withVcs(tmp.path, async (rt) => {
const branch = await rt.runPromise(Vcs.Service.use((s) => s.branch()))
await withVcs(tmp.path, async () => {
const branch = await Vcs.branch()
expect(branch).toBeDefined()
expect(typeof branch).toBe("string")
})
@@ -92,8 +88,8 @@ describeVcs("Vcs", () => {
test("branch() returns undefined for non-git directories", async () => {
await using tmp = await tmpdir()
await withVcs(tmp.path, async (rt) => {
const branch = await rt.runPromise(Vcs.Service.use((s) => s.branch()))
await withVcs(tmp.path, async () => {
const branch = await Vcs.branch()
expect(branch).toBeUndefined()
})
})
@@ -119,14 +115,14 @@ describeVcs("Vcs", () => {
const branch = `test-${Math.random().toString(36).slice(2)}`
await $`git branch ${branch}`.cwd(tmp.path).quiet()
await withVcs(tmp.path, async (rt) => {
await withVcs(tmp.path, async () => {
const pending = nextBranchUpdate(tmp.path)
const head = path.join(tmp.path, ".git", "HEAD")
await fs.writeFile(head, `ref: refs/heads/${branch}\n`)
await pending
const current = await rt.runPromise(Vcs.Service.use((s) => s.branch()))
const current = await Vcs.branch()
expect(current).toBe(branch)
})
})
@@ -141,8 +137,8 @@ describe("Vcs diff", () => {
await using tmp = await tmpdir({ git: true })
await $`git branch -M main`.cwd(tmp.path).quiet()
await withVcsOnly(tmp.path, async (rt) => {
const branch = await rt.runPromise(Vcs.Service.use((s) => s.defaultBranch()))
await withVcsOnly(tmp.path, async () => {
const branch = await Vcs.defaultBranch()
expect(branch).toBe("main")
})
})
@@ -152,8 +148,8 @@ describe("Vcs diff", () => {
await $`git branch -M trunk`.cwd(tmp.path).quiet()
await $`git config init.defaultBranch trunk`.cwd(tmp.path).quiet()
await withVcsOnly(tmp.path, async (rt) => {
const branch = await rt.runPromise(Vcs.Service.use((s) => s.defaultBranch()))
await withVcsOnly(tmp.path, async () => {
const branch = await Vcs.defaultBranch()
expect(branch).toBe("trunk")
})
})
@@ -165,11 +161,8 @@ describe("Vcs diff", () => {
const dir = path.join(wt.path, "feature")
await $`git worktree add -b feature/test ${dir} HEAD`.cwd(tmp.path).quiet()
await withVcsOnly(dir, async (rt) => {
const [branch, base] = await Promise.all([
rt.runPromise(Vcs.Service.use((s) => s.branch())),
rt.runPromise(Vcs.Service.use((s) => s.defaultBranch())),
])
await withVcsOnly(dir, async () => {
const [branch, base] = await Promise.all([Vcs.branch(), Vcs.defaultBranch()])
expect(branch).toBe("feature/test")
expect(base).toBe("main")
})
@@ -182,8 +175,8 @@ describe("Vcs diff", () => {
await $`git commit --no-gpg-sign -m "add file"`.cwd(tmp.path).quiet()
await fs.writeFile(path.join(tmp.path, "file.txt"), "changed\n", "utf-8")
await withVcsOnly(tmp.path, async (rt) => {
const diff = await rt.runPromise(Vcs.Service.use((s) => s.diff("git")))
await withVcsOnly(tmp.path, async () => {
const diff = await Vcs.diff("git")
expect(diff).toEqual(
expect.arrayContaining([
expect.objectContaining({
@@ -199,8 +192,8 @@ describe("Vcs diff", () => {
await using tmp = await tmpdir({ git: true })
await fs.writeFile(path.join(tmp.path, weird), "hello\n", "utf-8")
await withVcsOnly(tmp.path, async (rt) => {
const diff = await rt.runPromise(Vcs.Service.use((s) => s.diff("git")))
await withVcsOnly(tmp.path, async () => {
const diff = await Vcs.diff("git")
expect(diff).toEqual(
expect.arrayContaining([
expect.objectContaining({
@@ -220,8 +213,8 @@ describe("Vcs diff", () => {
await $`git add .`.cwd(tmp.path).quiet()
await $`git commit --no-gpg-sign -m "branch file"`.cwd(tmp.path).quiet()
await withVcsOnly(tmp.path, async (rt) => {
const diff = await rt.runPromise(Vcs.Service.use((s) => s.diff("branch")))
await withVcsOnly(tmp.path, async () => {
const diff = await Vcs.diff("branch")
expect(diff).toEqual(
expect.arrayContaining([
expect.objectContaining({

View File

@@ -110,10 +110,16 @@ describe("SyncEvent", () => {
type: string
properties: { id: string; name: string }
}> = []
const unsub = Bus.subscribeAll((event) => events.push(event))
const received = new Promise<void>((resolve) => {
Bus.subscribeAll((event) => {
events.push(event)
resolve()
})
})
SyncEvent.run(Created, { id: "evt_1", name: "test" })
await received
expect(events).toHaveLength(1)
expect(events[0]).toEqual({
type: "item.created",
@@ -122,8 +128,6 @@ describe("SyncEvent", () => {
name: "test",
},
})
unsub()
}),
)
})

View File

@@ -89,7 +89,6 @@ describe("tool.edit", () => {
const { FileWatcher } = await import("../../src/file/watcher")
const events: string[] = []
const unsubEdited = Bus.subscribe(File.Event.Edited, () => events.push("edited"))
const unsubUpdated = Bus.subscribe(FileWatcher.Event.Updated, () => events.push("updated"))
const edit = await EditTool.init()
@@ -102,9 +101,7 @@ describe("tool.edit", () => {
ctx,
)
expect(events).toContain("edited")
expect(events).toContain("updated")
unsubEdited()
unsubUpdated()
},
})
@@ -305,11 +302,9 @@ describe("tool.edit", () => {
await FileTime.read(ctx.sessionID, filepath)
const { Bus } = await import("../../src/bus")
const { File } = await import("../../src/file")
const { FileWatcher } = await import("../../src/file/watcher")
const events: string[] = []
const unsubEdited = Bus.subscribe(File.Event.Edited, () => events.push("edited"))
const unsubUpdated = Bus.subscribe(FileWatcher.Event.Updated, () => events.push("updated"))
const edit = await EditTool.init()
@@ -322,9 +317,7 @@ describe("tool.edit", () => {
ctx,
)
expect(events).toContain("edited")
expect(events).toContain("updated")
unsubEdited()
unsubUpdated()
},
})