Compare commits

..

1 Commits

Author SHA1 Message Date
Kit Langton
052708056a effectify Skill service internals: replace Effect.promise with native Effect
Convert the Skill service's internal async helpers (add, scan, loadSkills) from
plain async functions wrapped in Effect.promise to native Effect generators using
Effect.fnUntraced. Yield Config.Service and Bus.Service directly instead of going
through their async facades, and yield discovery.pull as a native Effect rather
than bridging through Effect.runPromise.
2026-03-26 22:39:01 -04:00
3 changed files with 183 additions and 149 deletions

View File

@@ -53,112 +53,100 @@ export namespace Plugin {
Service,
Effect.gen(function* () {
const bus = yield* Bus.Service
const config = yield* Config.Service
const cache = yield* InstanceState.make<State>(
Effect.fn("Plugin.state")(function* (ctx) {
const hooks: Hooks[] = []
const { Server } = yield* Effect.promise(() => import("../server/server"))
yield* Effect.promise(async () => {
const { Server } = await import("../server/server")
const client = createOpencodeClient({
baseUrl: "http://localhost:4096",
directory: ctx.directory,
headers: Flag.OPENCODE_SERVER_PASSWORD
? {
Authorization: `Basic ${Buffer.from(`${Flag.OPENCODE_SERVER_USERNAME ?? "opencode"}:${Flag.OPENCODE_SERVER_PASSWORD}`).toString("base64")}`,
}
: undefined,
fetch: async (...args) => Server.Default().fetch(...args),
})
const cfg = yield* config.get()
const input: PluginInput = {
client,
project: ctx.project,
worktree: ctx.worktree,
directory: ctx.directory,
get serverUrl(): URL {
return Server.url ?? new URL("http://localhost:4096")
},
$: Bun.$,
}
for (const plugin of INTERNAL_PLUGINS) {
log.info("loading internal plugin", { name: plugin.name })
const init = yield* Effect.tryPromise({
try: () => plugin(input),
catch: (err) => {
log.error("failed to load internal plugin", { name: plugin.name, error: err })
const client = createOpencodeClient({
baseUrl: "http://localhost:4096",
directory: ctx.directory,
headers: Flag.OPENCODE_SERVER_PASSWORD
? {
Authorization: `Basic ${Buffer.from(`${Flag.OPENCODE_SERVER_USERNAME ?? "opencode"}:${Flag.OPENCODE_SERVER_PASSWORD}`).toString("base64")}`,
}
: undefined,
fetch: async (...args) => Server.Default().fetch(...args),
})
const cfg = await Config.get()
const input: PluginInput = {
client,
project: ctx.project,
worktree: ctx.worktree,
directory: ctx.directory,
get serverUrl(): URL {
return Server.url ?? new URL("http://localhost:4096")
},
}).pipe(Effect.option)
if (init._tag === "Some") hooks.push(init.value)
}
$: Bun.$,
}
const plugins = cfg.plugin ?? []
if (plugins.length) yield* config.waitForDependencies()
for (const plugin of INTERNAL_PLUGINS) {
log.info("loading internal plugin", { name: plugin.name })
const init = await plugin(input).catch((err) => {
log.error("failed to load internal plugin", { name: plugin.name, error: err })
})
if (init) hooks.push(init)
}
for (let pluginPath of plugins) {
if (DEPRECATED_PLUGIN_PACKAGES.some((pkg) => pluginPath.includes(pkg))) continue
log.info("loading plugin", { path: pluginPath })
if (!pluginPath.startsWith("file://")) {
const idx = pluginPath.lastIndexOf("@")
const pkg = idx > 0 ? pluginPath.substring(0, idx) : pluginPath
const version = idx > 0 ? pluginPath.substring(idx + 1) : "latest"
const installed = yield* Effect.tryPromise({
try: () => BunProc.install(pkg, version),
catch: (err) => {
let plugins = cfg.plugin ?? []
if (plugins.length) await Config.waitForDependencies()
for (let plugin of plugins) {
if (DEPRECATED_PLUGIN_PACKAGES.some((pkg) => plugin.includes(pkg))) continue
log.info("loading plugin", { path: plugin })
if (!plugin.startsWith("file://")) {
const idx = plugin.lastIndexOf("@")
const pkg = idx > 0 ? plugin.substring(0, idx) : plugin
const version = idx > 0 ? plugin.substring(idx + 1) : "latest"
plugin = await BunProc.install(pkg, version).catch((err) => {
const cause = err instanceof Error ? err.cause : err
const detail = cause instanceof Error ? cause.message : String(cause ?? err)
log.error("failed to install plugin", { pkg, version, error: detail })
return detail
},
}).pipe(Effect.catch((detail) =>
bus.publish(Session.Event.Error, {
error: new NamedError.Unknown({
message: `Failed to install plugin ${pkg}@${version}: ${detail}`,
}).toObject(),
}).pipe(Effect.as("")),
))
if (!installed) continue
pluginPath = installed
Bus.publish(Session.Event.Error, {
error: new NamedError.Unknown({
message: `Failed to install plugin ${pkg}@${version}: ${detail}`,
}).toObject(),
})
return ""
})
if (!plugin) continue
}
// Prevent duplicate initialization when plugins export the same function
// as both a named export and default export (e.g., `export const X` and `export default X`).
// Object.entries(mod) would return both entries pointing to the same function reference.
await import(plugin)
.then(async (mod) => {
const seen = new Set<PluginInstance>()
for (const [_name, fn] of Object.entries<PluginInstance>(mod)) {
if (seen.has(fn)) continue
seen.add(fn)
hooks.push(await fn(input))
}
})
.catch((err) => {
const message = err instanceof Error ? err.message : String(err)
log.error("failed to load plugin", { path: plugin, error: message })
Bus.publish(Session.Event.Error, {
error: new NamedError.Unknown({
message: `Failed to load plugin ${plugin}: ${message}`,
}).toObject(),
})
})
}
// Prevent duplicate initialization when plugins export the same function
// as both a named export and default export (e.g., `export const X` and `export default X`).
// Object.entries(mod) would return both entries pointing to the same function reference.
yield* Effect.tryPromise({
try: async () => {
const mod = await import(pluginPath)
const seen = new Set<PluginInstance>()
for (const [_name, fn] of Object.entries<PluginInstance>(mod)) {
if (seen.has(fn)) continue
seen.add(fn)
hooks.push(await fn(input))
}
},
catch: (err) => {
const message = err instanceof Error ? err.message : String(err)
log.error("failed to load plugin", { path: pluginPath, error: message })
return message
},
}).pipe(Effect.catch((message) =>
bus.publish(Session.Event.Error, {
error: new NamedError.Unknown({
message: `Failed to load plugin ${pluginPath}: ${message}`,
}).toObject(),
}),
))
}
// Notify plugins of current config
for (const hook of hooks) {
yield* Effect.tryPromise({
try: () => Promise.resolve((hook as any).config?.(cfg)),
catch: (err) => {
// Notify plugins of current config
for (const hook of hooks) {
try {
await (hook as any).config?.(cfg)
} catch (err) {
log.error("plugin config hook failed", { error: err })
},
}).pipe(Effect.ignore)
}
}
}
})
// Subscribe to bus events, fiber interrupted when scope closes
yield* bus.subscribeAll().pipe(
@@ -183,11 +171,13 @@ export namespace Plugin {
>(name: Name, input: Input, output: Output) {
if (!name) return output
const state = yield* InstanceState.get(cache)
for (const hook of state.hooks) {
const fn = hook[name] as any
if (!fn) continue
yield* Effect.promise(() => fn(input, output))
}
yield* Effect.promise(async () => {
for (const hook of state.hooks) {
const fn = hook[name] as any
if (!fn) continue
await fn(input, output)
}
})
return output
})
@@ -204,7 +194,7 @@ export namespace Plugin {
}),
)
export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Config.defaultLayer))
const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function trigger<

View File

@@ -63,16 +63,23 @@ export namespace Skill {
readonly available: (agent?: Agent.Info) => Effect.Effect<Info[]>
}
const add = async (state: State, match: string) => {
const md = await ConfigMarkdown.parse(match).catch(async (err) => {
const message = ConfigMarkdown.FrontmatterError.isInstance(err)
? err.data.message
: `Failed to parse skill ${match}`
const { Session } = await import("@/session")
Bus.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() })
log.error("failed to load skill", { skill: match, err })
return undefined
})
const add = Effect.fnUntraced(function* (state: State, match: string, bus: Bus.Interface) {
const md = yield* Effect.tryPromise({
try: () => ConfigMarkdown.parse(match),
catch: (err) => err,
}).pipe(
Effect.catch(
Effect.fnUntraced(function* (err) {
const message = ConfigMarkdown.FrontmatterError.isInstance(err)
? err.data.message
: `Failed to parse skill ${match}`
const { Session } = yield* Effect.promise(() => import("@/session"))
yield* bus.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() })
log.error("failed to load skill", { skill: match, err })
return undefined
}),
),
)
if (!md) return
@@ -94,80 +101,112 @@ export namespace Skill {
location: match,
content: md.content,
}
}
})
const scan = async (state: State, root: string, pattern: string, opts?: { dot?: boolean; scope?: string }) => {
return Glob.scan(pattern, {
cwd: root,
absolute: true,
include: "file",
symlink: true,
dot: opts?.dot,
})
.then((matches) => Promise.all(matches.map((match) => add(state, match))))
.catch((error) => {
if (!opts?.scope) throw error
const scan = Effect.fnUntraced(function* (
state: State,
bus: Bus.Interface,
root: string,
pattern: string,
opts?: { dot?: boolean; scope?: string },
) {
const matches = yield* Effect.tryPromise({
try: () =>
Glob.scan(pattern, {
cwd: root,
absolute: true,
include: "file",
symlink: true,
dot: opts?.dot,
}),
catch: (error) => error,
}).pipe(
Effect.catch((error) => {
if (!opts?.scope) return Effect.die(error)
log.error(`failed to scan ${opts.scope} skills`, { dir: root, error })
})
}
return Effect.succeed([] as string[])
}),
)
async function loadSkills(state: State, discovery: Discovery.Interface, directory: string, worktree: string) {
yield* Effect.forEach(matches, (match) => add(state, match, bus), { discard: true })
})
const loadSkills = Effect.fnUntraced(function* (
state: State,
config: Config.Interface,
discovery: Discovery.Interface,
bus: Bus.Interface,
directory: string,
worktree: string,
) {
if (!Flag.OPENCODE_DISABLE_EXTERNAL_SKILLS) {
for (const dir of EXTERNAL_DIRS) {
const root = path.join(Global.Path.home, dir)
if (!(await Filesystem.isDir(root))) continue
await scan(state, root, EXTERNAL_SKILL_PATTERN, { dot: true, scope: "global" })
const isDir = yield* Effect.promise(() => Filesystem.isDir(root))
if (!isDir) continue
yield* scan(state, bus, root, EXTERNAL_SKILL_PATTERN, { dot: true, scope: "global" })
}
for await (const root of Filesystem.up({
targets: EXTERNAL_DIRS,
start: directory,
stop: worktree,
})) {
await scan(state, root, EXTERNAL_SKILL_PATTERN, { dot: true, scope: "project" })
const upDirs = yield* Effect.promise(async () => {
const dirs: string[] = []
for await (const root of Filesystem.up({
targets: EXTERNAL_DIRS,
start: directory,
stop: worktree,
})) {
dirs.push(root)
}
return dirs
})
for (const root of upDirs) {
yield* scan(state, bus, root, EXTERNAL_SKILL_PATTERN, { dot: true, scope: "project" })
}
}
for (const dir of await Config.directories()) {
await scan(state, dir, OPENCODE_SKILL_PATTERN)
const configDirs = yield* config.directories()
for (const dir of configDirs) {
yield* scan(state, bus, dir, OPENCODE_SKILL_PATTERN)
}
const cfg = await Config.get()
const cfg = yield* config.get()
for (const item of cfg.skills?.paths ?? []) {
const expanded = item.startsWith("~/") ? path.join(os.homedir(), item.slice(2)) : item
const dir = path.isAbsolute(expanded) ? expanded : path.join(directory, expanded)
if (!(await Filesystem.isDir(dir))) {
const isDir = yield* Effect.promise(() => Filesystem.isDir(dir))
if (!isDir) {
log.warn("skill path not found", { path: dir })
continue
}
await scan(state, dir, SKILL_PATTERN)
yield* scan(state, bus, dir, SKILL_PATTERN)
}
for (const url of cfg.skills?.urls ?? []) {
for (const dir of await Effect.runPromise(discovery.pull(url))) {
const pulledDirs = yield* discovery.pull(url)
for (const dir of pulledDirs) {
state.dirs.add(dir)
await scan(state, dir, SKILL_PATTERN)
yield* scan(state, bus, dir, SKILL_PATTERN)
}
}
log.info("init", { count: Object.keys(state.skills).length })
}
})
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Skill") {}
export const layer: Layer.Layer<Service, never, Discovery.Service> = Layer.effect(
export const layer: Layer.Layer<Service, never, Discovery.Service | Config.Service | Bus.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const discovery = yield* Discovery.Service
const config = yield* Config.Service
const bus = yield* Bus.Service
const state = yield* InstanceState.make(
Effect.fn("Skill.state")((ctx) =>
Effect.gen(function* () {
const s: State = { skills: {}, dirs: new Set() }
yield* Effect.promise(() => loadSkills(s, discovery, ctx.directory, ctx.worktree))
return s
}),
),
Effect.fn("Skill.state")(function* (ctx) {
const s: State = { skills: {}, dirs: new Set() }
yield* loadSkills(s, config, discovery, bus, ctx.directory, ctx.worktree)
return s
}),
)
const get = Effect.fn("Skill.get")(function* (name: string) {
@@ -196,7 +235,11 @@ export namespace Skill {
}),
)
export const defaultLayer: Layer.Layer<Service> = layer.pipe(Layer.provide(Discovery.defaultLayer))
export const defaultLayer: Layer.Layer<Service> = layer.pipe(
Layer.provide(Discovery.defaultLayer),
Layer.provide(Config.defaultLayer),
Layer.provide(Bus.layer),
)
export function fmt(list: Info[], opts: { verbose: boolean }) {
if (list.length === 0) return "No skills are currently available."

View File

@@ -61,11 +61,12 @@ describe("plugin.config-hook-error-isolation", () => {
test("config hooks are individually error-isolated in the layer factory", async () => {
const src = await Bun.file(file).text()
// Each hook's config call is wrapped in Effect.tryPromise with error logging + Effect.ignore
// The config hook try/catch lives in the InstanceState factory (layer definition),
// not in init() which now just delegates to the Effect service.
expect(src).toContain("plugin config hook failed")
const pattern =
/for\s*\(const hook of hooks\)\s*\{[\s\S]*?Effect\.tryPromise[\s\S]*?\.config\?\.\([\s\S]*?plugin config hook failed[\s\S]*?Effect\.ignore/
/for\s*\(const hook of hooks\)\s*\{[\s\S]*?try\s*\{[\s\S]*?\.config\?\.\([\s\S]*?\}\s*catch\s*\(err\)\s*\{[\s\S]*?plugin config hook failed[\s\S]*?\}/
expect(pattern.test(src)).toBe(true)
})
})