Compare commits

..

18 Commits

Author SHA1 Message Date
Kit Langton
211e76523e test(bus): add Effect-native test for InstanceDisposed delivery on disposal 2026-03-22 18:48:32 -04:00
Kit Langton
c15bf90847 docs: add acquireRelease example for non-Bus resources 2026-03-22 13:08:58 -04:00
Kit Langton
822e2922a3 refactor test fixtures: provideTmpdirInstance, delete withServices, rewrite format tests
- Add provideInstance, provideTmpdirInstance to test/fixture/fixture.ts
- Delete test/fixture/instance.ts (withServices no longer needed)
- Rewrite format tests to use provideTmpdirInstance + testEffect (fully Effect-native)
- Effect-native bus tests use provideTmpdirInstance
2026-03-22 13:07:15 -04:00
Kit Langton
e241b89eae add runSync safety comment 2026-03-22 12:35:58 -04:00
Kit Langton
34b7173e92 simplify git helper to flatMap 2026-03-22 12:29:44 -04:00
Kit Langton
b7371de859 add tmpdirScoped with ChildProcessSpawner, Effect-native bus tests
- tmpdirScoped uses FileSystem.makeTempDirectoryScoped (auto cleanup on scope close)
- Git init uses ChildProcessSpawner.spawn + ChildProcess.make (proper Effect pattern)
- Effect-native bus tests use withServices + Bus.layer + Effect.scoped + forkScoped
2026-03-22 12:13:51 -04:00
Kit Langton
d1d1c896a0 docs: update AGENTS.md to reflect InstanceState and makeRuntime patterns 2026-03-22 11:55:30 -04:00
Kit Langton
99fc519700 docs: update effect-migration with runCallback, Stream subscription pattern, check off Bus 2026-03-22 11:53:07 -04:00
Kit Langton
aceadf1ef8 restore File.Event.Edited publish for plugin hooks (fire-and-forget) 2026-03-22 11:09:29 -04:00
Kit Langton
ea31863bd9 cleanup: remove unused memoMap import, fix extra blank line 2026-03-22 11:09:29 -04:00
Kit Langton
aeceb372db integrate effectified services with Bus Effect-native API
- Plugin: use bus.subscribeAll() Stream + forkScoped instead of callback facade
- Vcs: use bus.subscribe() Stream + forkScoped instead of acquireRelease callback
- Status: use bus.publish() Effect directly instead of Effect.promise wrapper
- Rewrite Vcs test to use facade functions (shared memoMap ensures singleton Bus)
- Add Bus integration tests for subscribe cleanup and instance disposal
2026-03-22 11:09:28 -04:00
Kit Langton
fcec6216eb remove unnecessary double-check in getOrCreate (Effect.sync is atomic) 2026-03-22 11:09:28 -04:00
Kit Langton
8c559c2b90 fix(bus): prevent race in getOrCreate with double-check after yield 2026-03-22 11:09:28 -04:00
Kit Langton
0f5bd3214d test(bus): add instance isolation and multiple subscriber tests 2026-03-22 11:09:27 -04:00
Kit Langton
ee8025f556 add runCallback to makeRuntime, simplify forkStream, add bus tests
- Add runCallback to makeRuntime (returns interrupt function directly)
- Simplify forkStream to use runCallback instead of runFork + Fiber.interrupt
- Add 6 bus tests covering publish/subscribe, unsubscribe, subscribeAll,
  and InstanceDisposed delivery on instance disposal
2026-03-22 11:09:27 -04:00
Kit Langton
83916ca675 simplify: extract forkStream, fix dead event test subscribers, update docs 2026-03-22 11:09:27 -04:00
Kit Langton
050336f160 effectify Bus service: migrate to Effect PubSub + InstanceState
- Replace manual subscription Map with PubSub.unbounded per instance
- Per-type PubSubs + wildcard PubSub, cleaned up via addFinalizer
- InstanceDisposed published before PubSub shutdown so subscribers see it
- Replace makeRunPromise with makeRuntime (single runtime with runPromise + runFork)
- Update all 19 services to use makeRuntime destructuring
- Legacy facade preserved: publish/subscribe/subscribeAll same signatures
- subscribe/subscribeAll fork stream consumer fibers, return interrupt function
- Extract Format.file() for explicit formatting, remove event-driven subscription
- Inline Format.file() calls in write/edit/apply_patch tools
- Drop Bus.once (zero callers)
2026-03-22 11:09:26 -04:00
Kit Langton
a724b01652 effectify Bus service: migrate to Effect PubSub + InstanceState
- Replace manual subscription Map with PubSub.unbounded per instance
- Per-type PubSubs + wildcard PubSub, cleaned up via addFinalizer
- InstanceDisposed published before PubSub shutdown so subscribers see it
- Add makeRuntime to run-service.ts (single runtime with runPromise + runFork)
- Legacy facade preserved: publish/subscribe/subscribeAll same signatures
- subscribe/subscribeAll fork stream consumer fibers, return interrupt function
- Extract Format.file() for explicit formatting, remove event-driven subscription
- Inline Format.file() calls in write/edit/apply_patch tools
- Drop Bus.once (zero callers)
- Keep makeRunPromise as deprecated wrapper for existing services
2026-03-22 11:09:26 -04:00
45 changed files with 1199 additions and 799 deletions

View File

@@ -31,12 +31,14 @@ See `specs/effect-migration.md` for the compact pattern reference and examples.
- Use `Schema.Defect` instead of `unknown` for defect-like causes.
- In `Effect.gen` / `Effect.fn`, prefer `yield* new MyError(...)` over `yield* Effect.fail(new MyError(...))` for direct early-failure branches.
## Runtime vs Instances
## Runtime vs InstanceState
- Use the shared runtime for process-wide services with one lifecycle for the whole app.
- Use `src/effect/instances.ts` for per-directory or per-project services that need `InstanceContext`, per-instance state, or per-instance cleanup.
- If two open directories should not share one copy of the service, it belongs in `Instances`.
- Instance-scoped services should read context from `InstanceContext`, not `Instance.*` globals.
- Use `makeRuntime` (from `src/effect/run-service.ts`) for all services. It returns `{ runPromise, runFork, runCallback }` backed by a shared `memoMap` that deduplicates layers.
- Use `InstanceState` (from `src/effect/instance-state.ts`) for per-directory or per-project state that needs per-instance cleanup. It uses `ScopedCache` keyed by directory — each open project gets its own state, automatically cleaned up on disposal.
- If two open directories should not share one copy of the service, it needs `InstanceState`.
- Do the work directly in the `InstanceState.make` closure — `ScopedCache` handles run-once semantics. Don't add fibers, `ensure()` callbacks, or `started` flags on top.
- Use `Effect.addFinalizer` or `Effect.acquireRelease` inside the `InstanceState.make` closure for cleanup (subscriptions, process teardown, etc.).
- Use `Effect.forkScoped` inside the closure for background stream consumers — the fiber is interrupted when the instance is disposed.
## Preferred Effect services
@@ -51,7 +53,7 @@ See `specs/effect-migration.md` for the compact pattern reference and examples.
`Instance.bind(fn)` captures the current Instance AsyncLocalStorage context and restores it synchronously when called.
Use it for native addon callbacks (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish`, `Instance.state()`, or anything that reads `Instance.directory`.
Use it for native addon callbacks (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish` or anything that reads `Instance.directory`.
You do not need it for `setTimeout`, `Promise.then`, `EventEmitter.on`, or Effect fibers.

View File

View File

@@ -6,7 +6,7 @@ Practical reference for new and migrated Effect code in `packages/opencode`.
Use `InstanceState` (from `src/effect/instance-state.ts`) for services that need per-directory state, per-instance cleanup, or project-bound background work. InstanceState uses a `ScopedCache` keyed by directory, so each open project gets its own copy of the state that is automatically cleaned up on disposal.
Use `makeRunPromise` (from `src/effect/run-service.ts`) to create a per-service `ManagedRuntime` that lazily initializes and shares layers via a global `memoMap`.
Use `makeRuntime` (from `src/effect/run-service.ts`) to create a per-service `ManagedRuntime` that lazily initializes and shares layers via a global `memoMap`. Returns `{ runPromise, runFork, runCallback }`.
- Global services (no per-directory state): Account, Auth, Installation, Truncate
- Instance-scoped (per-directory state via InstanceState): File, FileTime, FileWatcher, Format, Permission, Question, Skill, Snapshot, Vcs, ProviderAuth
@@ -46,7 +46,7 @@ export namespace Foo {
export const defaultLayer = layer.pipe(Layer.provide(FooDep.layer))
// Per-service runtime (inside the namespace)
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
// Async facade functions
export async function get(id: FooID) {
@@ -79,29 +79,34 @@ See `Auth.ZodInfo` for the canonical example.
The `InstanceState.make` init callback receives a `Scope`, so you can use `Effect.acquireRelease`, `Effect.addFinalizer`, and `Effect.forkScoped` inside it. Resources acquired this way are automatically cleaned up when the instance is disposed or invalidated by `ScopedCache`. This makes it the right place for:
- **Subscriptions**: Use `Effect.acquireRelease` to subscribe and auto-unsubscribe:
- **Subscriptions**: Yield `Bus.Service` at the layer level, then use `Stream` + `forkScoped` inside the init closure. The fiber is automatically interrupted when the instance scope closes:
```ts
const cache =
yield *
InstanceState.make<State>(
Effect.fn("Foo.state")(function* (ctx) {
// ... load state ...
const bus = yield* Bus.Service
yield* Effect.acquireRelease(
Effect.sync(() =>
Bus.subscribeAll((event) => {
/* handle */
}),
),
(unsub) => Effect.sync(unsub),
const cache = yield* InstanceState.make<State>(
Effect.fn("Foo.state")(function* (ctx) {
// ... load state ...
yield* bus
.subscribeAll()
.pipe(
Stream.runForEach((event) => Effect.sync(() => { /* handle */ })),
Effect.forkScoped,
)
return {
/* state */
}
}),
)
return { /* state */ }
}),
)
```
- **Resource cleanup**: Use `Effect.acquireRelease` or `Effect.addFinalizer` for resources that need teardown (native watchers, process handles, etc.):
```ts
yield* Effect.acquireRelease(
Effect.sync(() => nativeAddon.watch(dir)),
(watcher) => Effect.sync(() => watcher.close()),
)
```
- **Background fibers**: Use `Effect.forkScoped` — the fiber is interrupted on disposal.
@@ -165,7 +170,7 @@ Still open and likely worth migrating:
- [x] `ToolRegistry`
- [ ] `Pty`
- [ ] `Worktree`
- [ ] `Bus`
- [x] `Bus`
- [x] `Command`
- [ ] `Config`
- [ ] `Session`

View File

@@ -1,7 +1,7 @@
import { Clock, Duration, Effect, Layer, Option, Schema, SchemaGetter, ServiceMap } from "effect"
import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { withTransientReadRetry } from "@/util/effect-http-client"
import { AccountRepo, type AccountRow } from "./repo"
import {
@@ -379,7 +379,7 @@ export namespace Account {
export const defaultLayer = layer.pipe(Layer.provide(AccountRepo.layer), Layer.provide(FetchHttpClient.layer))
export const runPromise = makeRunPromise(Service, defaultLayer)
export const { runPromise } = makeRuntime(Service, defaultLayer)
export async function active(): Promise<Info | undefined> {
return Option.getOrUndefined(await runPromise((service) => service.active()))

View File

@@ -1,6 +1,6 @@
import path from "path"
import { Effect, Layer, Record, Result, Schema, ServiceMap } from "effect"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { zod } from "@/util/effect-zod"
import { Global } from "../global"
import { Filesystem } from "../util/filesystem"
@@ -95,7 +95,7 @@ export namespace Auth {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export async function get(providerID: string) {
return runPromise((service) => service.get(providerID))

View File

@@ -1,12 +1,14 @@
import z from "zod"
import { Effect, Exit, Layer, PubSub, Scope, ServiceMap, Stream } from "effect"
import { Log } from "../util/log"
import { Instance } from "../project/instance"
import { BusEvent } from "./bus-event"
import { GlobalBus } from "./global"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
export namespace Bus {
const log = Log.create({ service: "bus" })
type Subscription = (event: any) => void
export const InstanceDisposed = BusEvent.define(
"server.instance.disposed",
@@ -15,91 +17,168 @@ export namespace Bus {
}),
)
const state = Instance.state(
() => {
const subscriptions = new Map<any, Subscription[]>()
type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
type: D["type"]
properties: z.infer<D["properties"]>
}
return {
subscriptions,
type State = {
wildcard: PubSub.PubSub<Payload>
typed: Map<string, PubSub.PubSub<Payload>>
}
export interface Interface {
readonly publish: <D extends BusEvent.Definition>(
def: D,
properties: z.output<D["properties"]>,
) => Effect.Effect<void>
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
readonly subscribeAll: () => Stream.Stream<Payload>
readonly subscribeCallback: <D extends BusEvent.Definition>(
def: D,
callback: (event: Payload<D>) => unknown,
) => Effect.Effect<() => void>
readonly subscribeAllCallback: (callback: (event: any) => unknown) => Effect.Effect<() => void>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Bus") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const cache = yield* InstanceState.make<State>(
Effect.fn("Bus.state")(function* (ctx) {
const wildcard = yield* PubSub.unbounded<Payload>()
const typed = new Map<string, PubSub.PubSub<Payload>>()
yield* Effect.addFinalizer(() =>
Effect.gen(function* () {
// Publish InstanceDisposed before shutting down so subscribers see it
yield* PubSub.publish(wildcard, {
type: InstanceDisposed.type,
properties: { directory: ctx.directory },
})
yield* PubSub.shutdown(wildcard)
for (const ps of typed.values()) {
yield* PubSub.shutdown(ps)
}
}),
)
return { wildcard, typed }
}),
)
function getOrCreate<D extends BusEvent.Definition>(state: State, def: D) {
return Effect.gen(function* () {
let ps = state.typed.get(def.type)
if (!ps) {
ps = yield* PubSub.unbounded<Payload>()
state.typed.set(def.type, ps)
}
return ps as unknown as PubSub.PubSub<Payload<D>>
})
}
},
async (entry) => {
const wildcard = entry.subscriptions.get("*")
if (!wildcard) return
const event = {
type: InstanceDisposed.type,
properties: {
directory: Instance.directory,
},
function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
return Effect.gen(function* () {
const state = yield* InstanceState.get(cache)
const payload: Payload = { type: def.type, properties }
log.info("publishing", { type: def.type })
const ps = state.typed.get(def.type)
if (ps) yield* PubSub.publish(ps, payload)
yield* PubSub.publish(state.wildcard, payload)
GlobalBus.emit("event", {
directory: Instance.directory,
payload,
})
})
}
for (const sub of [...wildcard]) {
sub(event)
function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
log.info("subscribing", { type: def.type })
return Stream.unwrap(
Effect.gen(function* () {
const state = yield* InstanceState.get(cache)
const ps = yield* getOrCreate(state, def)
return Stream.fromPubSub(ps)
}),
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
}
},
function subscribeAll(): Stream.Stream<Payload> {
log.info("subscribing", { type: "*" })
return Stream.unwrap(
Effect.gen(function* () {
const state = yield* InstanceState.get(cache)
return Stream.fromPubSub(state.wildcard)
}),
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))))
}
function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
return Effect.gen(function* () {
log.info("subscribing", { type })
const scope = yield* Scope.make()
const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
yield* Scope.provide(scope)(
Stream.fromSubscription(subscription).pipe(
Stream.runForEach((msg) =>
Effect.tryPromise({
try: () => Promise.resolve().then(() => callback(msg)),
catch: (cause) => {
log.error("subscriber failed", { type, cause })
},
}).pipe(Effect.ignore),
),
Effect.forkScoped,
),
)
return () => {
log.info("unsubscribing", { type })
Effect.runFork(Scope.close(scope, Exit.void))
}
})
}
const subscribeCallback = Effect.fn("Bus.subscribeCallback")(function* <D extends BusEvent.Definition>(
def: D,
callback: (event: Payload<D>) => unknown,
) {
const state = yield* InstanceState.get(cache)
const ps = yield* getOrCreate(state, def)
return yield* on(ps, def.type, callback)
})
const subscribeAllCallback = Effect.fn("Bus.subscribeAllCallback")(function* (callback: (event: any) => unknown) {
const state = yield* InstanceState.get(cache)
return yield* on(state.wildcard, "*", callback)
})
return Service.of({ publish, subscribe, subscribeAll, subscribeCallback, subscribeAllCallback })
}),
)
export async function publish<Definition extends BusEvent.Definition>(
def: Definition,
properties: z.output<Definition["properties"]>,
const { runPromise, runSync } = makeRuntime(Service, layer)
// runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe,
// Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw.
export async function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
return runPromise((svc) => svc.publish(def, properties))
}
export function subscribe<D extends BusEvent.Definition>(
def: D,
callback: (event: { type: D["type"]; properties: z.infer<D["properties"]> }) => unknown,
) {
const payload = {
type: def.type,
properties,
}
log.info("publishing", {
type: def.type,
})
const pending = []
for (const key of [def.type, "*"]) {
const match = [...(state().subscriptions.get(key) ?? [])]
for (const sub of match) {
pending.push(sub(payload))
}
}
GlobalBus.emit("event", {
directory: Instance.directory,
payload,
})
return Promise.all(pending)
return runSync((svc) => svc.subscribeCallback(def, callback))
}
export function subscribe<Definition extends BusEvent.Definition>(
def: Definition,
callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
) {
return raw(def.type, callback)
}
export function once<Definition extends BusEvent.Definition>(
def: Definition,
callback: (event: {
type: Definition["type"]
properties: z.infer<Definition["properties"]>
}) => "done" | undefined,
) {
const unsub = subscribe(def, (event) => {
if (callback(event)) unsub()
})
}
export function subscribeAll(callback: (event: any) => void) {
return raw("*", callback)
}
function raw(type: string, callback: (event: any) => void) {
log.info("subscribing", { type })
const subscriptions = state().subscriptions
let match = subscriptions.get(type) ?? []
match.push(callback)
subscriptions.set(type, match)
return () => {
log.info("unsubscribing", { type })
const match = subscriptions.get(type)
if (!match) return
const index = match.indexOf(callback)
if (index === -1) return
match.splice(index, 1)
}
export function subscribeAll(callback: (event: any) => unknown) {
return runSync((svc) => svc.subscribeAllCallback(callback))
}
}

View File

@@ -5,8 +5,8 @@ import { MouseButton, TextAttributes } from "@opentui/core"
import { RouteProvider, useRoute } from "@tui/context/route"
import { Switch, Match, createEffect, untrack, ErrorBoundary, createSignal, onMount, batch, Show, on } from "solid-js"
import { win32DisableProcessedInput, win32FlushInputBuffer, win32InstallCtrlCGuard } from "./win32"
import { Installation } from "@/installation"
import { Flag } from "@/flag/flag"
import semver from "semver"
import { DialogProvider, useDialog } from "@tui/ui/dialog"
import { DialogProvider as DialogProviderList } from "@tui/component/dialog-provider"
import { SDKProvider, useSDK } from "@tui/context/sdk"
@@ -29,7 +29,6 @@ import { PromptHistoryProvider } from "./component/prompt/history"
import { FrecencyProvider } from "./component/prompt/frecency"
import { PromptStashProvider } from "./component/prompt/stash"
import { DialogAlert } from "./ui/dialog-alert"
import { DialogConfirm } from "./ui/dialog-confirm"
import { ToastProvider, useToast } from "./ui/toast"
import { ExitProvider, useExit } from "./context/exit"
import { Session as SessionApi } from "@/session"
@@ -104,7 +103,6 @@ async function getTerminalBackgroundColor(): Promise<"dark" | "light"> {
}
import type { EventSource } from "./context/sdk"
import { Installation } from "@/installation"
export function tui(input: {
url: string
@@ -731,49 +729,13 @@ function App() {
})
})
sdk.event.on("installation.update-available", async (evt) => {
const version = evt.properties.version
const skipped = kv.get("skipped_version")
if (skipped && !semver.gt(version, skipped)) return
const confirmed = await DialogConfirm.show(
dialog,
`Update Available`,
`A new release v${version} is available. Would you like to update now?`,
"skip",
)
if (!confirmed) {
kv.set("skipped_version", version)
return
}
sdk.event.on(Installation.Event.UpdateAvailable.type, (evt) => {
toast.show({
variant: "info",
message: `Updating to v${version}...`,
duration: 30000,
title: "Update Available",
message: `OpenCode v${evt.properties.version} is available. Run 'opencode upgrade' to update manually.`,
duration: 10000,
})
const result = await sdk.client.global.upgrade({ target: version })
if (result.error || !result.data?.success) {
toast.show({
variant: "error",
title: "Update Failed",
message: "Update failed",
duration: 10000,
})
return
}
await DialogAlert.show(
dialog,
"Update Complete",
`Successfully updated to OpenCode v${result.data.version}. Please restart the application.`,
)
exit()
})
return (

View File

@@ -11,7 +11,6 @@ export type DialogConfirmProps = {
message: string
onConfirm?: () => void
onCancel?: () => void
label?: string
}
export function DialogConfirm(props: DialogConfirmProps) {
@@ -46,7 +45,7 @@ export function DialogConfirm(props: DialogConfirmProps) {
<text fg={theme.textMuted}>{props.message}</text>
</box>
<box flexDirection="row" justifyContent="flex-end" paddingBottom={1}>
<For each={["cancel", "confirm"] as const}>
<For each={["cancel", "confirm"]}>
{(key) => (
<box
paddingLeft={1}
@@ -59,7 +58,7 @@ export function DialogConfirm(props: DialogConfirmProps) {
}}
>
<text fg={key === store.active ? theme.selectedListItemText : theme.textMuted}>
{Locale.titlecase(key === "cancel" ? (props.label ?? key) : key)}
{Locale.titlecase(key)}
</text>
</box>
)}
@@ -69,7 +68,7 @@ export function DialogConfirm(props: DialogConfirmProps) {
)
}
DialogConfirm.show = (dialog: DialogContext, title: string, message: string, label?: string) => {
DialogConfirm.show = (dialog: DialogContext, title: string, message: string) => {
return new Promise<boolean>((resolve) => {
dialog.replace(
() => (
@@ -78,7 +77,6 @@ DialogConfirm.show = (dialog: DialogContext, title: string, message: string, lab
message={message}
onConfirm={() => resolve(true)}
onCancel={() => resolve(false)}
label={label}
/>
),
() => resolve(false),

View File

@@ -8,18 +8,12 @@ export async function upgrade() {
const method = await Installation.method()
const latest = await Installation.latest(method).catch(() => {})
if (!latest) return
if (Installation.VERSION === latest) return
if (Flag.OPENCODE_ALWAYS_NOTIFY_UPDATE) {
await Bus.publish(Installation.Event.UpdateAvailable, { version: latest })
if (config.autoupdate === false || Flag.OPENCODE_DISABLE_AUTOUPDATE) {
return
}
if (Installation.VERSION === latest) return
if (config.autoupdate === false || Flag.OPENCODE_DISABLE_AUTOUPDATE) return
const kind = Installation.getReleaseType(Installation.VERSION, latest)
if (config.autoupdate === "notify" || kind !== "patch") {
if (config.autoupdate === "notify") {
await Bus.publish(Installation.Event.UpdateAvailable, { version: latest })
return
}

View File

@@ -1,6 +1,6 @@
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { SessionID, MessageID } from "@/session/schema"
import { Effect, Layer, ServiceMap } from "effect"
import z from "zod"
@@ -173,7 +173,7 @@ export namespace Command {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export async function get(name: string) {
return runPromise((svc) => svc.get(name))

View File

@@ -1,14 +0,0 @@
import { ServiceMap } from "effect"
import type { Project } from "@/project/project"
export declare namespace InstanceContext {
export interface Shape {
readonly directory: string
readonly worktree: string
readonly project: Project.Info
}
}
export class InstanceContext extends ServiceMap.Service<InstanceContext, InstanceContext.Shape>()(
"opencode/InstanceContext",
) {}

View File

@@ -3,11 +3,15 @@ import * as ServiceMap from "effect/ServiceMap"
export const memoMap = Layer.makeMemoMapUnsafe()
export function makeRunPromise<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap }))
return <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) => {
rt ??= ManagedRuntime.make(layer, { memoMap })
return rt.runPromise(service.use(fn), options)
return {
runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(service.use(fn)),
runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
getRuntime().runPromise(service.use(fn), options),
runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)),
runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runCallback(service.use(fn)),
}
}

View File

@@ -1,6 +1,6 @@
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { git } from "@/util/git"
import { Effect, Fiber, Layer, Scope, ServiceMap } from "effect"
import { formatPatch, structuredPatch } from "diff"
@@ -688,7 +688,7 @@ export namespace File {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export function init() {
return runPromise((svc) => svc.init())

View File

@@ -1,6 +1,6 @@
import { DateTime, Effect, Layer, Semaphore, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { Flag } from "@/flag/flag"
import type { SessionID } from "@/session/schema"
import { Filesystem } from "../util/filesystem"
@@ -108,7 +108,7 @@ export namespace FileTime {
}),
).pipe(Layer.orDie)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export function read(sessionID: SessionID, file: string) {
return runPromise((s) => s.read(sessionID, file))

View File

@@ -8,7 +8,7 @@ import z from "zod"
import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { Flag } from "@/flag/flag"
import { Instance } from "@/project/instance"
import { git } from "@/util/git"
@@ -159,7 +159,7 @@ export namespace FileWatcher {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export function init() {
return runPromise((svc) => svc.init())

View File

@@ -18,7 +18,6 @@ export namespace Flag {
export declare const OPENCODE_CONFIG_DIR: string | undefined
export const OPENCODE_CONFIG_CONTENT = process.env["OPENCODE_CONFIG_CONTENT"]
export const OPENCODE_DISABLE_AUTOUPDATE = truthy("OPENCODE_DISABLE_AUTOUPDATE")
export const OPENCODE_ALWAYS_NOTIFY_UPDATE = truthy("OPENCODE_ALWAYS_NOTIFY_UPDATE")
export const OPENCODE_DISABLE_PRUNE = truthy("OPENCODE_DISABLE_PRUNE")
export const OPENCODE_DISABLE_TERMINAL_TITLE = truthy("OPENCODE_DISABLE_TERMINAL_TITLE")
export const OPENCODE_PERMISSION = process.env["OPENCODE_PERMISSION"]

View File

@@ -1,12 +1,10 @@
import { Effect, Layer, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import path from "path"
import { mergeDeep } from "remeda"
import z from "zod"
import { Bus } from "../bus"
import { Config } from "../config/config"
import { File } from "../file"
import { Instance } from "../project/instance"
import { Process } from "../util/process"
import { Log } from "../util/log"
@@ -29,6 +27,7 @@ export namespace Format {
export interface Interface {
readonly init: () => Effect.Effect<void>
readonly status: () => Effect.Effect<Status[]>
readonly file: (filepath: string) => Effect.Effect<void>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Format") {}
@@ -97,53 +96,46 @@ export namespace Format {
return checks.filter((x) => x.enabled).map((x) => x.item)
}
yield* Effect.acquireRelease(
Effect.sync(() =>
Bus.subscribe(
File.Event.Edited,
Instance.bind(async (payload) => {
const file = payload.properties.file
log.info("formatting", { file })
const ext = path.extname(file)
async function formatFile(filepath: string) {
log.info("formatting", { file: filepath })
const ext = path.extname(filepath)
for (const item of await getFormatter(ext)) {
log.info("running", { command: item.command })
try {
const proc = Process.spawn(
item.command.map((x) => x.replace("$FILE", filepath)),
{
cwd: Instance.directory,
env: { ...process.env, ...item.environment },
stdout: "ignore",
stderr: "ignore",
},
)
const exit = await proc.exited
if (exit !== 0) {
log.error("failed", {
command: item.command,
...item.environment,
})
}
} catch (error) {
log.error("failed to format file", {
error,
command: item.command,
...item.environment,
file: filepath,
})
}
}
}
for (const item of await getFormatter(ext)) {
log.info("running", { command: item.command })
try {
const proc = Process.spawn(
item.command.map((x) => x.replace("$FILE", file)),
{
cwd: Instance.directory,
env: { ...process.env, ...item.environment },
stdout: "ignore",
stderr: "ignore",
},
)
const exit = await proc.exited
if (exit !== 0) {
log.error("failed", {
command: item.command,
...item.environment,
})
}
} catch (error) {
log.error("failed to format file", {
error,
command: item.command,
...item.environment,
file,
})
}
}
}),
),
),
(unsubscribe) => Effect.sync(unsubscribe),
)
log.info("init")
return {
formatters,
isEnabled,
formatFile,
}
}),
)
@@ -166,11 +158,16 @@ export namespace Format {
return result
})
return Service.of({ init, status })
const file = Effect.fn("Format.file")(function* (filepath: string) {
const { formatFile } = yield* InstanceState.get(state)
yield* Effect.promise(() => formatFile(filepath))
})
return Service.of({ init, status, file })
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export async function init() {
return runPromise((s) => s.init())
@@ -179,4 +176,8 @@ export namespace Format {
export async function status() {
return runPromise((s) => s.status())
}
export async function file(filepath: string) {
return runPromise((s) => s.file(filepath))
}
}

View File

@@ -1,7 +1,7 @@
import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
import { Effect, Layer, Schema, ServiceMap, Stream } from "effect"
import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { withTransientReadRetry } from "@/util/effect-http-client"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import path from "path"
@@ -15,15 +15,11 @@ declare global {
const OPENCODE_CHANNEL: string
}
import semver from "semver"
export namespace Installation {
const log = Log.create({ service: "installation" })
export type Method = "curl" | "npm" | "yarn" | "pnpm" | "bun" | "brew" | "scoop" | "choco" | "unknown"
export type ReleaseType = "patch" | "minor" | "major"
export const Event = {
Updated: BusEvent.define(
"installation.updated",
@@ -39,17 +35,6 @@ export namespace Installation {
),
}
export function getReleaseType(current: string, latest: string): ReleaseType {
const currMajor = semver.major(current)
const currMinor = semver.minor(current)
const newMajor = semver.major(latest)
const newMinor = semver.minor(latest)
if (newMajor > currMajor) return "major"
if (newMinor > currMinor) return "minor"
return "patch"
}
export const Info = z
.object({
version: z.string(),
@@ -345,7 +330,7 @@ export namespace Installation {
Layer.provide(NodePath.layer),
)
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function info(): Promise<Info> {
return runPromise((svc) => svc.info())

View File

@@ -2,7 +2,7 @@ import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { Config } from "@/config/config"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { ProjectID } from "@/project/schema"
import { Instance } from "@/project/instance"
import { MessageID, SessionID } from "@/session/schema"
@@ -306,7 +306,7 @@ export namespace Permission {
return result
}
export const runPromise = makeRunPromise(Service, layer)
export const { runPromise } = makeRuntime(Service, layer)
export async function ask(input: z.infer<typeof AskInput>) {
return runPromise((s) => s.ask(input))

View File

@@ -11,9 +11,9 @@ import { Session } from "../session"
import { NamedError } from "@opencode-ai/util/error"
import { CopilotAuthPlugin } from "./copilot"
import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
import { Effect, Layer, ServiceMap } from "effect"
import { Effect, Layer, ServiceMap, Stream } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
export namespace Plugin {
const log = Log.create({ service: "plugin" })
@@ -52,6 +52,8 @@ export namespace Plugin {
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const bus = yield* Bus.Service
const cache = yield* InstanceState.make<State>(
Effect.fn("Plugin.state")(function* (ctx) {
const hooks: Hooks[] = []
@@ -140,17 +142,19 @@ export namespace Plugin {
}
})
// Subscribe to bus events, clean up when scope is closed
yield* Effect.acquireRelease(
Effect.sync(() =>
Bus.subscribeAll(async (input) => {
for (const hook of hooks) {
hook["event"]?.({ event: input })
}
}),
),
(unsub) => Effect.sync(unsub),
)
// Subscribe to bus events, fiber interrupted when scope closes
yield* bus
.subscribeAll()
.pipe(
Stream.runForEach((input) =>
Effect.sync(() => {
for (const hook of hooks) {
hook["event"]?.({ event: input as any })
}
}),
),
Effect.forkScoped,
)
return { hooks }
}),
@@ -186,7 +190,8 @@ export namespace Plugin {
}),
)
const runPromise = makeRunPromise(Service, layer)
const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function trigger<
Name extends TriggerName,

View File

@@ -1,8 +1,8 @@
import { Effect, Layer, ServiceMap } from "effect"
import { Effect, Layer, ServiceMap, Stream } from "effect"
import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { FileWatcher } from "@/file/watcher"
import { Log } from "@/util/log"
import { git } from "@/util/git"
@@ -44,6 +44,8 @@ export namespace Vcs {
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const bus = yield* Bus.Service
const state = yield* InstanceState.make<State>(
Effect.fn("Vcs.state")((ctx) =>
Effect.gen(function* () {
@@ -65,23 +67,22 @@ export namespace Vcs {
}
log.info("initialized", { branch: value.current })
yield* Effect.acquireRelease(
Effect.sync(() =>
Bus.subscribe(
FileWatcher.Event.Updated,
Instance.bind(async (evt) => {
if (!evt.properties.file.endsWith("HEAD")) return
const next = await getCurrentBranch()
yield* bus
.subscribe(FileWatcher.Event.Updated)
.pipe(
Stream.filter((evt) => evt.properties.file.endsWith("HEAD")),
Stream.runForEach((evt) =>
Effect.gen(function* () {
const next = yield* Effect.promise(() => getCurrentBranch())
if (next !== value.current) {
log.info("branch changed", { from: value.current, to: next })
value.current = next
Bus.publish(Event.BranchUpdated, { branch: next })
yield* bus.publish(Event.BranchUpdated, { branch: next })
}
}),
),
),
(unsubscribe) => Effect.sync(unsubscribe),
)
Effect.forkScoped,
)
return value
}),
@@ -99,7 +100,8 @@ export namespace Vcs {
}),
)
const runPromise = makeRunPromise(Service, layer)
const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
const { runPromise } = makeRuntime(Service, defaultLayer)
export function init() {
return runPromise((svc) => svc.init())

View File

@@ -2,7 +2,7 @@ import type { AuthOuathResult, Hooks } from "@opencode-ai/plugin"
import { NamedError } from "@opencode-ai/util/error"
import { Auth } from "@/auth"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { Plugin } from "../plugin"
import { ProviderID } from "./schema"
import { Array as Arr, Effect, Layer, Record, Result, ServiceMap } from "effect"
@@ -231,7 +231,7 @@ export namespace ProviderAuth {
export const defaultLayer = layer.pipe(Layer.provide(Auth.layer))
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function methods() {
return runPromise((svc) => svc.methods())

View File

@@ -1,7 +1,7 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { Instance } from "@/project/instance"
import { type IPty } from "bun-pty"
import z from "zod"
@@ -361,7 +361,7 @@ export namespace Pty {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export async function list() {
return runPromise((svc) => svc.list())

View File

@@ -2,7 +2,7 @@ import { Deferred, Effect, Layer, Schema, ServiceMap } from "effect"
import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { SessionID, MessageID } from "@/session/schema"
import { Log } from "@/util/log"
import z from "zod"
@@ -197,7 +197,7 @@ export namespace Question {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export async function ask(input: {
sessionID: SessionID

View File

@@ -1,8 +1,7 @@
import { Hono } from "hono"
import { describeRoute, validator, resolver } from "hono-openapi"
import { describeRoute, resolver, validator } from "hono-openapi"
import { streamSSE } from "hono/streaming"
import z from "zod"
import { Bus } from "../../bus"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
import { AsyncQueue } from "@/util/queue"
@@ -196,62 +195,5 @@ export const GlobalRoutes = lazy(() =>
})
return c.json(true)
},
)
.post(
"/upgrade",
describeRoute({
summary: "Upgrade opencode",
description: "Upgrade opencode to the specified version or latest if not specified.",
operationId: "global.upgrade",
responses: {
200: {
description: "Upgrade result",
content: {
"application/json": {
schema: resolver(
z.union([
z.object({
success: z.literal(true),
version: z.string(),
}),
z.object({
success: z.literal(false),
error: z.string(),
}),
]),
),
},
},
},
...errors(400),
},
}),
validator(
"json",
z.object({
target: z.string().optional(),
}),
),
async (c) => {
const method = await Installation.method()
if (method === "unknown") {
return c.json({ success: false, error: "Unknown installation method" }, 400)
}
const target = c.req.valid("json").target || (await Installation.latest(method))
const result = await Installation.upgrade(method, target)
.then(() => ({ success: true as const, version: target }))
.catch((e) => ({ success: false as const, error: e instanceof Error ? e.message : String(e) }))
if (result.success) {
GlobalBus.emit("event", {
directory: "global",
payload: {
type: Installation.Event.Updated.type,
properties: { version: target },
},
})
return c.json(result)
}
return c.json(result, 500)
},
),
)

View File

@@ -1,7 +1,7 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { SessionID } from "./schema"
import { Effect, Layer, ServiceMap } from "effect"
import z from "zod"
@@ -55,6 +55,8 @@ export namespace SessionStatus {
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const bus = yield* Bus.Service
const state = yield* InstanceState.make(
Effect.fn("SessionStatus.state")(() => Effect.succeed(new Map<SessionID, Info>())),
)
@@ -70,9 +72,9 @@ export namespace SessionStatus {
const set = Effect.fn("SessionStatus.set")(function* (sessionID: SessionID, status: Info) {
const data = yield* InstanceState.get(state)
yield* Effect.promise(() => Bus.publish(Event.Status, { sessionID, status }))
yield* bus.publish(Event.Status, { sessionID, status })
if (status.type === "idle") {
yield* Effect.promise(() => Bus.publish(Event.Idle, { sessionID }))
yield* bus.publish(Event.Idle, { sessionID })
data.delete(sessionID)
return
}
@@ -83,7 +85,8 @@ export namespace SessionStatus {
}),
)
const runPromise = makeRunPromise(Service, layer)
const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function get(sessionID: SessionID) {
return runPromise((svc) => svc.get(sessionID))

View File

@@ -7,7 +7,7 @@ import { NamedError } from "@opencode-ai/util/error"
import type { Agent } from "@/agent/agent"
import { Bus } from "@/bus"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { Flag } from "@/flag/flag"
import { Global } from "@/global"
import { Permission } from "@/permission"
@@ -242,7 +242,7 @@ export namespace Skill {
return ["## Available Skills", ...list.map((skill) => `- **${skill.name}**: ${skill.description}`)].join("\n")
}
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function get(name: string) {
return runPromise((skill) => skill.get(name))

View File

@@ -4,7 +4,7 @@ import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import path from "path"
import z from "zod"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { AppFileSystem } from "@/filesystem"
import { Config } from "../config/config"
import { Global } from "../global"
@@ -360,7 +360,7 @@ export namespace Snapshot {
Layer.provide(NodePath.layer),
)
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function init() {
return runPromise((svc) => svc.init())

View File

@@ -13,6 +13,7 @@ import { LSP } from "../lsp"
import { Filesystem } from "../util/filesystem"
import DESCRIPTION from "./apply_patch.txt"
import { File } from "../file"
import { Format } from "../format"
const PatchParams = z.object({
patchText: z.string().describe("The full patch text that describes all changes to be made"),
@@ -220,9 +221,8 @@ export const ApplyPatchTool = Tool.define("apply_patch", {
}
if (edited) {
await Bus.publish(File.Event.Edited, {
file: edited,
})
await Format.file(edited)
Bus.publish(File.Event.Edited, { file: edited })
}
}

View File

@@ -12,6 +12,7 @@ import DESCRIPTION from "./edit.txt"
import { File } from "../file"
import { FileWatcher } from "../file/watcher"
import { Bus } from "../bus"
import { Format } from "../format"
import { FileTime } from "../file/time"
import { Filesystem } from "../util/filesystem"
import { Instance } from "../project/instance"
@@ -71,9 +72,8 @@ export const EditTool = Tool.define("edit", {
},
})
await Filesystem.write(filePath, params.newString)
await Bus.publish(File.Event.Edited, {
file: filePath,
})
await Format.file(filePath)
Bus.publish(File.Event.Edited, { file: filePath })
await Bus.publish(FileWatcher.Event.Updated, {
file: filePath,
event: existed ? "change" : "add",
@@ -108,9 +108,8 @@ export const EditTool = Tool.define("edit", {
})
await Filesystem.write(filePath, contentNew)
await Bus.publish(File.Event.Edited, {
file: filePath,
})
await Format.file(filePath)
Bus.publish(File.Event.Edited, { file: filePath })
await Bus.publish(FileWatcher.Event.Updated, {
file: filePath,
event: "change",

View File

@@ -31,7 +31,7 @@ import { Glob } from "../util/glob"
import { pathToFileURL } from "url"
import { Effect, Layer, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
export namespace ToolRegistry {
const log = Log.create({ service: "tool.registry" })
@@ -198,7 +198,7 @@ export namespace ToolRegistry {
}),
)
const runPromise = makeRunPromise(Service, layer)
const { runPromise } = makeRuntime(Service, layer)
export async function register(tool: Tool.Info) {
return runPromise((svc) => svc.register(tool))

View File

@@ -2,7 +2,7 @@ import { NodePath } from "@effect/platform-node"
import { Cause, Duration, Effect, Layer, Schedule, ServiceMap } from "effect"
import path from "path"
import type { Agent } from "../agent/agent"
import { makeRunPromise } from "@/effect/run-service"
import { makeRuntime } from "@/effect/run-service"
import { AppFileSystem } from "@/filesystem"
import { evaluate } from "@/permission/evaluate"
import { Identifier } from "../id/id"
@@ -136,7 +136,7 @@ export namespace Truncate {
export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer), Layer.provide(NodePath.layer))
const runPromise = makeRunPromise(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function output(text: string, options: Options = {}, agent?: Agent.Info): Promise<Result> {
return runPromise((s) => s.output(text, options, agent))

View File

@@ -7,6 +7,7 @@ import DESCRIPTION from "./write.txt"
import { Bus } from "../bus"
import { File } from "../file"
import { FileWatcher } from "../file/watcher"
import { Format } from "../format"
import { FileTime } from "../file/time"
import { Filesystem } from "../util/filesystem"
import { Instance } from "../project/instance"
@@ -42,9 +43,8 @@ export const WriteTool = Tool.define("write", {
})
await Filesystem.write(filepath, params.content)
await Bus.publish(File.Event.Edited, {
file: filepath,
})
await Format.file(filepath)
Bus.publish(File.Event.Edited, { file: filepath })
await Bus.publish(FileWatcher.Event.Updated, {
file: filepath,
event: exists ? "change" : "add",

View File

@@ -0,0 +1,164 @@
import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
import { describe, expect } from "bun:test"
import { Deferred, Effect, Layer, Stream } from "effect"
import z from "zod"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { Instance } from "../../src/project/instance"
import { provideInstance, provideTmpdirInstance, tmpdirScoped } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
const TestEvent = {
Ping: BusEvent.define("test.effect.ping", z.object({ value: z.number() })),
Pong: BusEvent.define("test.effect.pong", z.object({ message: z.string() })),
}
const node = NodeChildProcessSpawner.layer.pipe(
Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)),
)
const live = Layer.mergeAll(Bus.layer, node)
const it = testEffect(live)
describe("Bus (Effect-native)", () => {
it.effect("publish + subscribe stream delivers events", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const received: number[] = []
const done = yield* Deferred.make<void>()
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
received.push(evt.properties.value)
if (received.length === 2) Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* bus.publish(TestEvent.Ping, { value: 2 })
yield* Deferred.await(done)
expect(received).toEqual([1, 2])
}),
),
)
it.effect("subscribe filters by event type", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const pings: number[] = []
const done = yield* Deferred.make<void>()
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
pings.push(evt.properties.value)
Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Pong, { message: "ignored" })
yield* bus.publish(TestEvent.Ping, { value: 42 })
yield* Deferred.await(done)
expect(pings).toEqual([42])
}),
),
)
it.effect("subscribeAll receives all types", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const types: string[] = []
const done = yield* Deferred.make<void>()
yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
Effect.sync(() => {
types.push(evt.type)
if (types.length === 2) Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* bus.publish(TestEvent.Pong, { message: "hi" })
yield* Deferred.await(done)
expect(types).toContain("test.effect.ping")
expect(types).toContain("test.effect.pong")
}),
),
)
it.effect("multiple subscribers each receive the event", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const a: number[] = []
const b: number[] = []
const doneA = yield* Deferred.make<void>()
const doneB = yield* Deferred.make<void>()
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
a.push(evt.properties.value)
Deferred.doneUnsafe(doneA, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
b.push(evt.properties.value)
Deferred.doneUnsafe(doneB, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 99 })
yield* Deferred.await(doneA)
yield* Deferred.await(doneB)
expect(a).toEqual([99])
expect(b).toEqual([99])
}),
),
)
it.effect("subscribeAll stream sees InstanceDisposed on disposal", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped()
const types: string[] = []
const seen = yield* Deferred.make<void>()
const disposed = yield* Deferred.make<void>()
// Set up subscriber inside the instance
yield* Effect.gen(function* () {
const bus = yield* Bus.Service
yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
Effect.sync(() => {
types.push(evt.type)
if (evt.type === TestEvent.Ping.type) Deferred.doneUnsafe(seen, Effect.void)
if (evt.type === Bus.InstanceDisposed.type) Deferred.doneUnsafe(disposed, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* Deferred.await(seen)
}).pipe(provideInstance(dir))
// Dispose from OUTSIDE the instance scope
yield* Effect.promise(() => Instance.disposeAll())
yield* Deferred.await(disposed).pipe(Effect.timeout("2 seconds"))
expect(types).toContain("test.effect.ping")
expect(types).toContain(Bus.InstanceDisposed.type)
}),
)
})

View File

@@ -0,0 +1,87 @@
import { afterEach, describe, expect, test } from "bun:test"
import z from "zod"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { Instance } from "../../src/project/instance"
import { tmpdir } from "../fixture/fixture"
const TestEvent = BusEvent.define("test.integration", z.object({ value: z.number() }))
function withInstance(directory: string, fn: () => Promise<void>) {
return Instance.provide({ directory, fn })
}
describe("Bus integration: acquireRelease subscriber pattern", () => {
afterEach(() => Instance.disposeAll())
test("subscriber via callback facade receives events and cleans up on unsub", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
const unsub = Bus.subscribe(TestEvent, (evt) => {
received.push(evt.properties.value)
})
await Bun.sleep(10)
await Bus.publish(TestEvent, { value: 1 })
await Bus.publish(TestEvent, { value: 2 })
await Bun.sleep(10)
expect(received).toEqual([1, 2])
unsub()
await Bun.sleep(10)
await Bus.publish(TestEvent, { value: 3 })
await Bun.sleep(10)
expect(received).toEqual([1, 2])
})
})
test("subscribeAll receives events from multiple types", async () => {
await using tmp = await tmpdir()
const received: Array<{ type: string; value?: number }> = []
const OtherEvent = BusEvent.define("test.other", z.object({ value: z.number() }))
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
received.push({ type: evt.type, value: evt.properties.value })
})
await Bun.sleep(10)
await Bus.publish(TestEvent, { value: 10 })
await Bus.publish(OtherEvent, { value: 20 })
await Bun.sleep(10)
})
expect(received).toEqual([
{ type: "test.integration", value: 10 },
{ type: "test.other", value: 20 },
])
})
test("subscriber cleanup on instance disposal interrupts the stream", async () => {
await using tmp = await tmpdir()
const received: number[] = []
let disposed = false
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
if (evt.type === Bus.InstanceDisposed.type) {
disposed = true
return
}
received.push(evt.properties.value)
})
await Bun.sleep(10)
await Bus.publish(TestEvent, { value: 1 })
await Bun.sleep(10)
})
await Instance.disposeAll()
await Bun.sleep(50)
expect(received).toEqual([1])
expect(disposed).toBe(true)
})
})

View File

@@ -0,0 +1,219 @@
import { afterEach, describe, expect, test } from "bun:test"
import z from "zod"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { Instance } from "../../src/project/instance"
import { tmpdir } from "../fixture/fixture"
const TestEvent = {
Ping: BusEvent.define("test.ping", z.object({ value: z.number() })),
Pong: BusEvent.define("test.pong", z.object({ message: z.string() })),
}
function withInstance(directory: string, fn: () => Promise<void>) {
return Instance.provide({ directory, fn })
}
describe("Bus", () => {
afterEach(() => Instance.disposeAll())
describe("publish + subscribe", () => {
test("subscriber is live immediately after subscribe returns", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
})
await Bus.publish(TestEvent.Ping, { value: 42 })
await Bun.sleep(10)
})
expect(received).toEqual([42])
})
test("subscriber receives matching events", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
})
// Give the subscriber fiber time to start consuming
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 42 })
await Bus.publish(TestEvent.Ping, { value: 99 })
// Give subscriber time to process
await Bun.sleep(10)
})
expect(received).toEqual([42, 99])
})
test("subscriber does not receive events of other types", async () => {
await using tmp = await tmpdir()
const pings: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
pings.push(evt.properties.value)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Pong, { message: "hello" })
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
})
expect(pings).toEqual([1])
})
test("publish with no subscribers does not throw", async () => {
await using tmp = await tmpdir()
await withInstance(tmp.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 1 })
})
})
})
describe("unsubscribe", () => {
test("unsubscribe stops delivery", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
const unsub = Bus.subscribe(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
unsub()
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 2 })
await Bun.sleep(10)
})
expect(received).toEqual([1])
})
})
describe("subscribeAll", () => {
test("subscribeAll is live immediately after subscribe returns", async () => {
await using tmp = await tmpdir()
const received: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
received.push(evt.type)
})
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
})
expect(received).toEqual(["test.ping"])
})
test("receives all event types", async () => {
await using tmp = await tmpdir()
const received: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
received.push(evt.type)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bus.publish(TestEvent.Pong, { message: "hi" })
await Bun.sleep(10)
})
expect(received).toContain("test.ping")
expect(received).toContain("test.pong")
})
})
describe("multiple subscribers", () => {
test("all subscribers for same event type are called", async () => {
await using tmp = await tmpdir()
const a: number[] = []
const b: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
a.push(evt.properties.value)
})
Bus.subscribe(TestEvent.Ping, (evt) => {
b.push(evt.properties.value)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 7 })
await Bun.sleep(10)
})
expect(a).toEqual([7])
expect(b).toEqual([7])
})
})
describe("instance isolation", () => {
test("events in one directory do not reach subscribers in another", async () => {
await using tmpA = await tmpdir()
await using tmpB = await tmpdir()
const receivedA: number[] = []
const receivedB: number[] = []
await withInstance(tmpA.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
receivedA.push(evt.properties.value)
})
await Bun.sleep(10)
})
await withInstance(tmpB.path, async () => {
Bus.subscribe(TestEvent.Ping, (evt) => {
receivedB.push(evt.properties.value)
})
await Bun.sleep(10)
})
await withInstance(tmpA.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
})
await withInstance(tmpB.path, async () => {
await Bus.publish(TestEvent.Ping, { value: 2 })
await Bun.sleep(10)
})
expect(receivedA).toEqual([1])
expect(receivedB).toEqual([2])
})
})
describe("instance disposal", () => {
test("InstanceDisposed is delivered to wildcard subscribers before stream ends", async () => {
await using tmp = await tmpdir()
const received: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
received.push(evt.type)
})
await Bun.sleep(10)
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bun.sleep(10)
})
// Instance.disposeAll triggers the finalizer which publishes InstanceDisposed
await Instance.disposeAll()
await Bun.sleep(50)
expect(received).toContain("test.ping")
expect(received).toContain(Bus.InstanceDisposed.type)
})
})
})

View File

@@ -1,10 +1,10 @@
import { expect, test } from "bun:test"
import { Effect, Layer, ServiceMap } from "effect"
import { makeRunPromise } from "../../src/effect/run-service"
import { makeRuntime } from "../../src/effect/run-service"
class Shared extends ServiceMap.Service<Shared, { readonly id: number }>()("@test/Shared") {}
test("makeRunPromise shares dependent layers through the shared memo map", async () => {
test("makeRuntime shares dependent layers through the shared memo map", async () => {
let n = 0
const shared = Layer.effect(
@@ -37,8 +37,8 @@ test("makeRunPromise shares dependent layers through the shared memo map", async
}),
).pipe(Layer.provide(shared))
const runOne = makeRunPromise(One, one)
const runTwo = makeRunPromise(Two, two)
const { runPromise: runOne } = makeRuntime(One, one)
const { runPromise: runTwo } = makeRuntime(Two, two)
expect(await runOne((svc) => svc.get())).toBe(1)
expect(await runTwo((svc) => svc.get())).toBe(1)

View File

@@ -2,9 +2,8 @@ import { $ } from "bun"
import { afterEach, describe, expect, test } from "bun:test"
import fs from "fs/promises"
import path from "path"
import { Deferred, Effect, Option } from "effect"
import { ConfigProvider, Deferred, Effect, Layer, ManagedRuntime, Option } from "effect"
import { tmpdir } from "../fixture/fixture"
import { watcherConfigLayer, withServices } from "../fixture/instance"
import { Bus } from "../../src/bus"
import { FileWatcher } from "../../src/file/watcher"
import { Instance } from "../../src/project/instance"
@@ -16,20 +15,33 @@ const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? desc
// Helpers
// ---------------------------------------------------------------------------
const watcherConfigLayer = ConfigProvider.layer(
ConfigProvider.fromUnknown({
OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
}),
)
type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
/** Run `body` with a live FileWatcher service. */
function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
return withServices(
return Instance.provide({
directory,
FileWatcher.layer,
async (rt) => {
await rt.runPromise(FileWatcher.Service.use((s) => s.init()))
await Effect.runPromise(ready(directory))
await Effect.runPromise(body)
fn: async () => {
const layer: Layer.Layer<FileWatcher.Service, never, never> = FileWatcher.layer.pipe(
Layer.provide(watcherConfigLayer),
)
const rt = ManagedRuntime.make(layer)
try {
await rt.runPromise(FileWatcher.Service.use((s) => s.init()))
await Effect.runPromise(ready(directory))
await Effect.runPromise(body)
} finally {
await rt.dispose()
}
},
{ provide: [watcherConfigLayer] },
)
})
}
function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {

View File

@@ -2,7 +2,10 @@ import { $ } from "bun"
import * as fs from "fs/promises"
import os from "os"
import path from "path"
import { Effect, FileSystem, ServiceMap } from "effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import type { Config } from "../../src/config/config"
import { Instance } from "../../src/project/instance"
// Strip null bytes from paths (defensive fix for CI environment issues)
function sanitizePath(p: string): string {
@@ -71,3 +74,68 @@ export async function tmpdir<T>(options?: TmpDirOptions<T>) {
}
return result
}
/** Effectful scoped tmpdir. Cleaned up when the scope closes. Make sure these stay in sync */
export function tmpdirScoped(options?: { git?: boolean; config?: Partial<Config.Info> }) {
return Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
const dir = yield* fs.makeTempDirectoryScoped({ prefix: "opencode-test-" })
const git = (...args: string[]) =>
spawner.spawn(ChildProcess.make("git", args, { cwd: dir })).pipe(Effect.flatMap((handle) => handle.exitCode))
if (options?.git) {
yield* git("init")
yield* git("config", "core.fsmonitor", "false")
yield* git("config", "user.email", "test@opencode.test")
yield* git("config", "user.name", "Test")
yield* git("commit", "--allow-empty", "-m", "root commit")
}
if (options?.config) {
yield* fs.writeFileString(
path.join(dir, "opencode.json"),
JSON.stringify({ $schema: "https://opencode.ai/config.json", ...options.config }),
)
}
return dir
})
}
export const provideInstance =
(directory: string) =>
<A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> =>
Effect.servicesWith((services: ServiceMap.ServiceMap<R>) =>
Effect.promise<A>(async () =>
Instance.provide({
directory,
fn: () => Effect.runPromiseWith(services)(self),
}),
),
)
export function provideTmpdirInstance<A, E, R>(
self: (path: string) => Effect.Effect<A, E, R>,
options?: { git?: boolean; config?: Partial<Config.Info> },
) {
return Effect.gen(function* () {
const path = yield* tmpdirScoped(options)
let provided = false
yield* Effect.addFinalizer(() =>
provided
? Effect.promise(() =>
Instance.provide({
directory: path,
fn: () => Instance.dispose(),
}),
).pipe(Effect.ignore)
: Effect.void,
)
provided = true
return yield* self(path).pipe(provideInstance(path))
})
}

View File

@@ -1,51 +0,0 @@
import { ConfigProvider, Layer, ManagedRuntime } from "effect"
import { InstanceContext } from "../../src/effect/instance-context"
import { Instance } from "../../src/project/instance"
/** ConfigProvider that enables the experimental file watcher. */
export const watcherConfigLayer = ConfigProvider.layer(
ConfigProvider.fromUnknown({
OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
}),
)
/**
* Boot an Instance with the given service layers and run `body` with
* the ManagedRuntime. Cleanup is automatic — the runtime is disposed
* and Instance context is torn down when `body` completes.
*
* Layers may depend on InstanceContext (provided automatically).
* Pass extra layers via `options.provide` (e.g. ConfigProvider.layer).
*/
export function withServices<S>(
directory: string,
layer: Layer.Layer<S, any, InstanceContext>,
body: (rt: ManagedRuntime.ManagedRuntime<S, never>) => Promise<void>,
options?: { provide?: Layer.Layer<never>[] },
) {
return Instance.provide({
directory,
fn: async () => {
const ctx = Layer.sync(InstanceContext, () =>
InstanceContext.of({
directory: Instance.directory,
worktree: Instance.worktree,
project: Instance.project,
}),
)
let resolved: Layer.Layer<S> = layer.pipe(Layer.provide(ctx)) as any
if (options?.provide) {
for (const l of options.provide) {
resolved = resolved.pipe(Layer.provide(l)) as any
}
}
const rt = ManagedRuntime.make(resolved)
try {
await body(rt)
} finally {
await rt.dispose()
}
},
})
}

View File

@@ -1,172 +1,182 @@
import { Effect } from "effect"
import { afterEach, describe, expect, test } from "bun:test"
import { tmpdir } from "../fixture/fixture"
import { withServices } from "../fixture/instance"
import { Bus } from "../../src/bus"
import { File } from "../../src/file"
import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
import { describe, expect } from "bun:test"
import { Effect, Layer } from "effect"
import { provideTmpdirInstance } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
import { Format } from "../../src/format"
import * as Formatter from "../../src/format/formatter"
import { Instance } from "../../src/project/instance"
const node = NodeChildProcessSpawner.layer.pipe(
Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)),
)
const it = testEffect(Layer.mergeAll(Format.layer, node))
describe("Format", () => {
afterEach(async () => {
await Instance.disposeAll()
})
it.effect("status() returns built-in formatters when no config overrides", () =>
provideTmpdirInstance(() =>
Format.Service.use((fmt) =>
Effect.gen(function* () {
const statuses = yield* fmt.status()
expect(Array.isArray(statuses)).toBe(true)
expect(statuses.length).toBeGreaterThan(0)
test("status() returns built-in formatters when no config overrides", async () => {
await using tmp = await tmpdir()
for (const item of statuses) {
expect(typeof item.name).toBe("string")
expect(Array.isArray(item.extensions)).toBe(true)
expect(typeof item.enabled).toBe("boolean")
}
await withServices(tmp.path, Format.layer, async (rt) => {
const statuses = await rt.runPromise(Format.Service.use((s) => s.status()))
expect(Array.isArray(statuses)).toBe(true)
expect(statuses.length).toBeGreaterThan(0)
const gofmt = statuses.find((item) => item.name === "gofmt")
expect(gofmt).toBeDefined()
expect(gofmt!.extensions).toContain(".go")
}),
),
),
)
for (const s of statuses) {
expect(typeof s.name).toBe("string")
expect(Array.isArray(s.extensions)).toBe(true)
expect(typeof s.enabled).toBe("boolean")
}
it.effect("status() returns empty list when formatter is disabled", () =>
provideTmpdirInstance(
() =>
Format.Service.use((fmt) =>
Effect.gen(function* () {
expect(yield* fmt.status()).toEqual([])
}),
),
{ config: { formatter: false } },
),
)
const gofmt = statuses.find((s) => s.name === "gofmt")
expect(gofmt).toBeDefined()
expect(gofmt!.extensions).toContain(".go")
})
})
test("status() returns empty list when formatter is disabled", async () => {
await using tmp = await tmpdir({
config: { formatter: false },
})
await withServices(tmp.path, Format.layer, async (rt) => {
const statuses = await rt.runPromise(Format.Service.use((s) => s.status()))
expect(statuses).toEqual([])
})
})
test("status() excludes formatters marked as disabled in config", async () => {
await using tmp = await tmpdir({
config: {
formatter: {
gofmt: { disabled: true },
it.effect("status() excludes formatters marked as disabled in config", () =>
provideTmpdirInstance(
() =>
Format.Service.use((fmt) =>
Effect.gen(function* () {
const statuses = yield* fmt.status()
const gofmt = statuses.find((item) => item.name === "gofmt")
expect(gofmt).toBeUndefined()
}),
),
{
config: {
formatter: {
gofmt: { disabled: true },
},
},
},
})
),
)
await withServices(tmp.path, Format.layer, async (rt) => {
const statuses = await rt.runPromise(Format.Service.use((s) => s.status()))
const gofmt = statuses.find((s) => s.name === "gofmt")
expect(gofmt).toBeUndefined()
})
})
it.effect("service initializes without error", () =>
provideTmpdirInstance(() => Format.Service.use(() => Effect.void)),
)
test("service initializes without error", async () => {
await using tmp = await tmpdir()
await withServices(tmp.path, Format.layer, async (rt) => {
await rt.runPromise(Format.Service.use(() => Effect.void))
})
})
test("status() initializes formatter state per directory", async () => {
await using off = await tmpdir({
config: { formatter: false },
})
await using on = await tmpdir()
const a = await Instance.provide({
directory: off.path,
fn: () => Format.status(),
})
const b = await Instance.provide({
directory: on.path,
fn: () => Format.status(),
})
expect(a).toEqual([])
expect(b.length).toBeGreaterThan(0)
})
test("runs enabled checks for matching formatters in parallel", async () => {
await using tmp = await tmpdir()
const file = `${tmp.path}/test.parallel`
await Bun.write(file, "x")
const one = {
extensions: Formatter.gofmt.extensions,
enabled: Formatter.gofmt.enabled,
command: Formatter.gofmt.command,
}
const two = {
extensions: Formatter.mix.extensions,
enabled: Formatter.mix.enabled,
command: Formatter.mix.command,
}
let active = 0
let max = 0
Formatter.gofmt.extensions = [".parallel"]
Formatter.mix.extensions = [".parallel"]
Formatter.gofmt.command = ["sh", "-c", "true"]
Formatter.mix.command = ["sh", "-c", "true"]
Formatter.gofmt.enabled = async () => {
active++
max = Math.max(max, active)
await Bun.sleep(20)
active--
return true
}
Formatter.mix.enabled = async () => {
active++
max = Math.max(max, active)
await Bun.sleep(20)
active--
return true
}
try {
await withServices(tmp.path, Format.layer, async (rt) => {
await rt.runPromise(Format.Service.use((s) => s.init()))
await Bus.publish(File.Event.Edited, { file })
it.effect("status() initializes formatter state per directory", () =>
Effect.gen(function* () {
const a = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status()), {
config: { formatter: false },
})
} finally {
Formatter.gofmt.extensions = one.extensions
Formatter.gofmt.enabled = one.enabled
Formatter.gofmt.command = one.command
Formatter.mix.extensions = two.extensions
Formatter.mix.enabled = two.enabled
Formatter.mix.command = two.command
}
const b = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status()))
expect(max).toBe(2)
})
expect(a).toEqual([])
expect(b.length).toBeGreaterThan(0)
}),
)
test("runs matching formatters sequentially for the same file", async () => {
await using tmp = await tmpdir({
config: {
formatter: {
first: {
command: ["sh", "-c", 'sleep 0.05; v=$(cat "$1"); printf \'%sA\' "$v" > "$1"', "sh", "$FILE"],
extensions: [".seq"],
},
second: {
command: ["sh", "-c", 'v=$(cat "$1"); printf \'%sB\' "$v" > "$1"', "sh", "$FILE"],
extensions: [".seq"],
it.effect("runs enabled checks for matching formatters in parallel", () =>
provideTmpdirInstance((path) =>
Effect.gen(function* () {
const file = `${path}/test.parallel`
yield* Effect.promise(() => Bun.write(file, "x"))
const one = {
extensions: Formatter.gofmt.extensions,
enabled: Formatter.gofmt.enabled,
command: Formatter.gofmt.command,
}
const two = {
extensions: Formatter.mix.extensions,
enabled: Formatter.mix.enabled,
command: Formatter.mix.command,
}
let active = 0
let max = 0
yield* Effect.acquireUseRelease(
Effect.sync(() => {
Formatter.gofmt.extensions = [".parallel"]
Formatter.mix.extensions = [".parallel"]
Formatter.gofmt.command = ["sh", "-c", "true"]
Formatter.mix.command = ["sh", "-c", "true"]
Formatter.gofmt.enabled = async () => {
active++
max = Math.max(max, active)
await Bun.sleep(20)
active--
return true
}
Formatter.mix.enabled = async () => {
active++
max = Math.max(max, active)
await Bun.sleep(20)
active--
return true
}
}),
() =>
Format.Service.use((fmt) =>
Effect.gen(function* () {
yield* fmt.init()
yield* fmt.file(file)
}),
),
() =>
Effect.sync(() => {
Formatter.gofmt.extensions = one.extensions
Formatter.gofmt.enabled = one.enabled
Formatter.gofmt.command = one.command
Formatter.mix.extensions = two.extensions
Formatter.mix.enabled = two.enabled
Formatter.mix.command = two.command
}),
)
expect(max).toBe(2)
}),
),
)
it.effect("runs matching formatters sequentially for the same file", () =>
provideTmpdirInstance(
(path) =>
Effect.gen(function* () {
const file = `${path}/test.seq`
yield* Effect.promise(() => Bun.write(file, "x"))
yield* Format.Service.use((fmt) =>
Effect.gen(function* () {
yield* fmt.init()
yield* fmt.file(file)
}),
)
expect(yield* Effect.promise(() => Bun.file(file).text())).toBe("xAB")
}),
{
config: {
formatter: {
first: {
command: ["sh", "-c", 'sleep 0.05; v=$(cat "$1"); printf \'%sA\' "$v" > "$1"', "sh", "$FILE"],
extensions: [".seq"],
},
second: {
command: ["sh", "-c", 'v=$(cat "$1"); printf \'%sB\' "$v" > "$1"', "sh", "$FILE"],
extensions: [".seq"],
},
},
},
},
})
const file = `${tmp.path}/test.seq`
await Bun.write(file, "x")
await withServices(tmp.path, Format.layer, async (rt) => {
await rt.runPromise(Format.Service.use((s) => s.init()))
await Bus.publish(File.Event.Edited, { file })
})
expect(await Bun.file(file).text()).toBe("xAB")
})
),
)
})

View File

@@ -2,9 +2,7 @@ import { $ } from "bun"
import { afterEach, describe, expect, test } from "bun:test"
import fs from "fs/promises"
import path from "path"
import { Effect, Layer, ManagedRuntime } from "effect"
import { tmpdir } from "../fixture/fixture"
import { watcherConfigLayer, withServices } from "../fixture/instance"
import { FileWatcher } from "../../src/file/watcher"
import { Instance } from "../../src/project/instance"
import { GlobalBus } from "../../src/bus/global"
@@ -17,21 +15,16 @@ const describeVcs = FileWatcher.hasNativeBinding() && !process.env.CI ? describe
// Helpers
// ---------------------------------------------------------------------------
function withVcs(
directory: string,
body: (rt: ManagedRuntime.ManagedRuntime<FileWatcher.Service | Vcs.Service, never>) => Promise<void>,
) {
return withServices(
async function withVcs(directory: string, body: () => Promise<void>) {
return Instance.provide({
directory,
Layer.merge(FileWatcher.layer, Vcs.layer),
async (rt) => {
await rt.runPromise(FileWatcher.Service.use((s) => s.init()))
await rt.runPromise(Vcs.Service.use((s) => s.init()))
fn: async () => {
FileWatcher.init()
Vcs.init()
await Bun.sleep(500)
await body(rt)
await body()
},
{ provide: [watcherConfigLayer] },
)
})
}
type BranchEvent = { directory?: string; payload: { type: string; properties: { branch?: string } } }
@@ -74,8 +67,8 @@ describeVcs("Vcs", () => {
test("branch() returns current branch name", async () => {
await using tmp = await tmpdir({ git: true })
await withVcs(tmp.path, async (rt) => {
const branch = await rt.runPromise(Vcs.Service.use((s) => s.branch()))
await withVcs(tmp.path, async () => {
const branch = await Vcs.branch()
expect(branch).toBeDefined()
expect(typeof branch).toBe("string")
})
@@ -84,8 +77,8 @@ describeVcs("Vcs", () => {
test("branch() returns undefined for non-git directories", async () => {
await using tmp = await tmpdir()
await withVcs(tmp.path, async (rt) => {
const branch = await rt.runPromise(Vcs.Service.use((s) => s.branch()))
await withVcs(tmp.path, async () => {
const branch = await Vcs.branch()
expect(branch).toBeUndefined()
})
})
@@ -111,14 +104,14 @@ describeVcs("Vcs", () => {
const branch = `test-${Math.random().toString(36).slice(2)}`
await $`git branch ${branch}`.cwd(tmp.path).quiet()
await withVcs(tmp.path, async (rt) => {
await withVcs(tmp.path, async () => {
const pending = nextBranchUpdate(tmp.path)
const head = path.join(tmp.path, ".git", "HEAD")
await fs.writeFile(head, `ref: refs/heads/${branch}\n`)
await pending
const current = await rt.runPromise(Vcs.Service.use((s) => s.branch()))
const current = await Vcs.branch()
expect(current).toBe(branch)
})
})

View File

@@ -89,7 +89,6 @@ describe("tool.edit", () => {
const { FileWatcher } = await import("../../src/file/watcher")
const events: string[] = []
const unsubEdited = Bus.subscribe(File.Event.Edited, () => events.push("edited"))
const unsubUpdated = Bus.subscribe(FileWatcher.Event.Updated, () => events.push("updated"))
const edit = await EditTool.init()
@@ -102,9 +101,7 @@ describe("tool.edit", () => {
ctx,
)
expect(events).toContain("edited")
expect(events).toContain("updated")
unsubEdited()
unsubUpdated()
},
})
@@ -305,11 +302,9 @@ describe("tool.edit", () => {
await FileTime.read(ctx.sessionID, filepath)
const { Bus } = await import("../../src/bus")
const { File } = await import("../../src/file")
const { FileWatcher } = await import("../../src/file/watcher")
const events: string[] = []
const unsubEdited = Bus.subscribe(File.Event.Edited, () => events.push("edited"))
const unsubUpdated = Bus.subscribe(FileWatcher.Event.Updated, () => events.push("updated"))
const edit = await EditTool.init()
@@ -322,9 +317,7 @@ describe("tool.edit", () => {
ctx,
)
expect(events).toContain("edited")
expect(events).toContain("updated")
unsubEdited()
unsubUpdated()
},
})

View File

@@ -46,8 +46,6 @@ import type {
GlobalDisposeResponses,
GlobalEventResponses,
GlobalHealthResponses,
GlobalUpgradeErrors,
GlobalUpgradeResponses,
InstanceDisposeResponses,
LspStatusResponses,
McpAddErrors,
@@ -230,62 +228,6 @@ class HeyApiRegistry<T> {
}
}
export class Auth extends HeyApiClient {
/**
* Remove auth credentials
*
* Remove authentication credentials
*/
public remove<ThrowOnError extends boolean = false>(
parameters: {
providerID: string
},
options?: Options<never, ThrowOnError>,
) {
const params = buildClientParams([parameters], [{ args: [{ in: "path", key: "providerID" }] }])
return (options?.client ?? this.client).delete<AuthRemoveResponses, AuthRemoveErrors, ThrowOnError>({
url: "/auth/{providerID}",
...options,
...params,
})
}
/**
* Set auth credentials
*
* Set authentication credentials
*/
public set<ThrowOnError extends boolean = false>(
parameters: {
providerID: string
auth?: Auth3
},
options?: Options<never, ThrowOnError>,
) {
const params = buildClientParams(
[parameters],
[
{
args: [
{ in: "path", key: "providerID" },
{ key: "auth", map: "body" },
],
},
],
)
return (options?.client ?? this.client).put<AuthSetResponses, AuthSetErrors, ThrowOnError>({
url: "/auth/{providerID}",
...options,
...params,
headers: {
"Content-Type": "application/json",
...options?.headers,
...params.headers,
},
})
}
}
export class Config extends HeyApiClient {
/**
* Get global configuration
@@ -361,20 +303,57 @@ export class Global extends HeyApiClient {
})
}
private _config?: Config
get config(): Config {
return (this._config ??= new Config({ client: this.client }))
}
}
export class Auth extends HeyApiClient {
/**
* Upgrade opencode
* Remove auth credentials
*
* Upgrade opencode to the specified version or latest if not specified.
* Remove authentication credentials
*/
public upgrade<ThrowOnError extends boolean = false>(
parameters?: {
target?: string
public remove<ThrowOnError extends boolean = false>(
parameters: {
providerID: string
},
options?: Options<never, ThrowOnError>,
) {
const params = buildClientParams([parameters], [{ args: [{ in: "body", key: "target" }] }])
return (options?.client ?? this.client).post<GlobalUpgradeResponses, GlobalUpgradeErrors, ThrowOnError>({
url: "/global/upgrade",
const params = buildClientParams([parameters], [{ args: [{ in: "path", key: "providerID" }] }])
return (options?.client ?? this.client).delete<AuthRemoveResponses, AuthRemoveErrors, ThrowOnError>({
url: "/auth/{providerID}",
...options,
...params,
})
}
/**
* Set auth credentials
*
* Set authentication credentials
*/
public set<ThrowOnError extends boolean = false>(
parameters: {
providerID: string
auth?: Auth3
},
options?: Options<never, ThrowOnError>,
) {
const params = buildClientParams(
[parameters],
[
{
args: [
{ in: "path", key: "providerID" },
{ key: "auth", map: "body" },
],
},
],
)
return (options?.client ?? this.client).put<AuthSetResponses, AuthSetErrors, ThrowOnError>({
url: "/auth/{providerID}",
...options,
...params,
headers: {
@@ -384,11 +363,6 @@ export class Global extends HeyApiClient {
},
})
}
private _config?: Config
get config(): Config {
return (this._config ??= new Config({ client: this.client }))
}
}
export class Project extends HeyApiClient {
@@ -3932,16 +3906,16 @@ export class OpencodeClient extends HeyApiClient {
OpencodeClient.__registry.set(this, args?.key)
}
private _auth?: Auth
get auth(): Auth {
return (this._auth ??= new Auth({ client: this.client }))
}
private _global?: Global
get global(): Global {
return (this._global ??= new Global({ client: this.client }))
}
private _auth?: Auth
get auth(): Auth {
return (this._auth ??= new Auth({ client: this.client }))
}
private _project?: Project
get project(): Project {
return (this._project ??= new Project({ client: this.client }))

View File

@@ -4,36 +4,6 @@ export type ClientOptions = {
baseUrl: `${string}://${string}` | (string & {})
}
export type BadRequestError = {
data: unknown
errors: Array<{
[key: string]: unknown
}>
success: false
}
export type OAuth = {
type: "oauth"
refresh: string
access: string
expires: number
accountId?: string
enterpriseUrl?: string
}
export type ApiAuth = {
type: "api"
key: string
}
export type WellKnownAuth = {
type: "wellknown"
key: string
token: string
}
export type Auth = OAuth | ApiAuth | WellKnownAuth
export type EventInstallationUpdated = {
type: "installation.updated"
properties: {
@@ -1536,6 +1506,36 @@ export type Config = {
}
}
export type BadRequestError = {
data: unknown
errors: Array<{
[key: string]: unknown
}>
success: false
}
export type OAuth = {
type: "oauth"
refresh: string
access: string
expires: number
accountId?: string
enterpriseUrl?: string
}
export type ApiAuth = {
type: "api"
key: string
}
export type WellKnownAuth = {
type: "wellknown"
key: string
token: string
}
export type Auth = OAuth | ApiAuth | WellKnownAuth
export type NotFoundError = {
name: "NotFoundError"
data: {
@@ -1938,60 +1938,6 @@ export type FormatterStatus = {
enabled: boolean
}
export type AuthRemoveData = {
body?: never
path: {
providerID: string
}
query?: never
url: "/auth/{providerID}"
}
export type AuthRemoveErrors = {
/**
* Bad request
*/
400: BadRequestError
}
export type AuthRemoveError = AuthRemoveErrors[keyof AuthRemoveErrors]
export type AuthRemoveResponses = {
/**
* Successfully removed authentication credentials
*/
200: boolean
}
export type AuthRemoveResponse = AuthRemoveResponses[keyof AuthRemoveResponses]
export type AuthSetData = {
body?: Auth
path: {
providerID: string
}
query?: never
url: "/auth/{providerID}"
}
export type AuthSetErrors = {
/**
* Bad request
*/
400: BadRequestError
}
export type AuthSetError = AuthSetErrors[keyof AuthSetErrors]
export type AuthSetResponses = {
/**
* Successfully set authentication credentials
*/
200: boolean
}
export type AuthSetResponse = AuthSetResponses[keyof AuthSetResponses]
export type GlobalHealthData = {
body?: never
path?: never
@@ -2084,40 +2030,59 @@ export type GlobalDisposeResponses = {
export type GlobalDisposeResponse = GlobalDisposeResponses[keyof GlobalDisposeResponses]
export type GlobalUpgradeData = {
body?: {
target?: string
export type AuthRemoveData = {
body?: never
path: {
providerID: string
}
path?: never
query?: never
url: "/global/upgrade"
url: "/auth/{providerID}"
}
export type GlobalUpgradeErrors = {
export type AuthRemoveErrors = {
/**
* Bad request
*/
400: BadRequestError
}
export type GlobalUpgradeError = GlobalUpgradeErrors[keyof GlobalUpgradeErrors]
export type AuthRemoveError = AuthRemoveErrors[keyof AuthRemoveErrors]
export type GlobalUpgradeResponses = {
export type AuthRemoveResponses = {
/**
* Upgrade result
* Successfully removed authentication credentials
*/
200:
| {
success: true
version: string
}
| {
success: false
error: string
}
200: boolean
}
export type GlobalUpgradeResponse = GlobalUpgradeResponses[keyof GlobalUpgradeResponses]
export type AuthRemoveResponse = AuthRemoveResponses[keyof AuthRemoveResponses]
export type AuthSetData = {
body?: Auth
path: {
providerID: string
}
query?: never
url: "/auth/{providerID}"
}
export type AuthSetErrors = {
/**
* Bad request
*/
400: BadRequestError
}
export type AuthSetError = AuthSetErrors[keyof AuthSetErrors]
export type AuthSetResponses = {
/**
* Successfully set authentication credentials
*/
200: boolean
}
export type AuthSetResponse = AuthSetResponses[keyof AuthSetResponses]
export type ProjectListData = {
body?: never