Compare commits

..

2 Commits

Author SHA1 Message Date
Kit Langton
8e8ac7abe9 Merge branch 'dev' into kit/effectify-session-status 2026-03-19 21:16:41 -04:00
Kit Langton
3217d112ec effectify SessionStatus service, add runSyncInstance helper 2026-03-19 20:24:49 -04:00
9 changed files with 172 additions and 194 deletions

View File

@@ -123,10 +123,11 @@ Done now:
- [x] `Discovery`
- [x] `File`
- [x] `Snapshot`
- [x] `SessionStatus`
Still open and likely worth migrating:
- [x] `Plugin`
- [ ] `Plugin`
- [ ] `ToolRegistry`
- [ ] `Pty`
- [ ] `Worktree`

View File

@@ -1,5 +1,5 @@
import z from "zod"
import type { ZodObject, ZodRawShape } from "zod"
import type { ZodType } from "zod"
import { Log } from "../util/log"
export namespace BusEvent {
@@ -9,7 +9,7 @@ export namespace BusEvent {
const registry = new Map<string, Definition>()
export function define<Type extends string, Properties extends ZodObject<ZodRawShape>>(type: Type, properties: Properties) {
export function define<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
const result = {
type,
properties,

View File

@@ -4,7 +4,7 @@ export const GlobalBus = new EventEmitter<{
event: [
{
directory?: string
payload: { type: string; properties: Record<string, unknown> }
payload: any
},
]
}>()

View File

@@ -124,7 +124,7 @@ export namespace Workspace {
await parseSSE(res.body, stop, (event) => {
GlobalBus.emit("event", {
directory: space.id,
payload: event as { type: string; properties: Record<string, unknown> },
payload: event,
})
})
// Wait 250ms and retry if SSE connection fails

View File

@@ -8,9 +8,9 @@ import { Instance } from "@/project/instance"
import { Vcs } from "@/project/vcs"
import { ProviderAuth } from "@/provider/auth"
import { Question } from "@/question"
import { SessionStatus } from "@/session/status"
import { Skill } from "@/skill/skill"
import { Snapshot } from "@/snapshot"
import { Plugin } from "@/plugin"
import { InstanceContext } from "./instance-context"
import { registerDisposer } from "./instance-registry"
@@ -25,9 +25,9 @@ export type InstanceServices =
| FileTime.Service
| Format.Service
| File.Service
| SessionStatus.Service
| Skill.Service
| Snapshot.Service
| Plugin.Service
// NOTE: LayerMap only passes the key (directory string) to lookup, but we need
// the full instance context (directory, worktree, project). We read from the
@@ -35,7 +35,7 @@ export type InstanceServices =
// runPromiseInstance -> Instances.get, which always runs inside Instance.provide.
// This should go away once the old Instance type is removed and lookup can load
// the full context directly.
function lookup(_key: string): Layer.Layer<InstanceServices> {
function lookup(_key: string) {
const ctx = Layer.sync(InstanceContext, () => InstanceContext.of(Instance.current))
return Layer.mergeAll(
Layer.fresh(Question.layer),
@@ -46,9 +46,9 @@ function lookup(_key: string): Layer.Layer<InstanceServices> {
Layer.fresh(FileTime.layer).pipe(Layer.orDie),
Layer.fresh(Format.layer),
Layer.fresh(File.layer),
Layer.fresh(SessionStatus.layer),
Layer.fresh(Skill.defaultLayer),
Layer.fresh(Snapshot.defaultLayer),
Layer.fresh(Plugin.layer),
).pipe(Layer.provide(ctx))
}

View File

@@ -18,6 +18,10 @@ export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceSer
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function runSyncInstance<A, E>(effect: Effect.Effect<A, E, InstanceServices>) {
return runtime.runSync(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function disposeRuntime() {
return runtime.dispose()
}

View File

@@ -5,15 +5,13 @@ import { Log } from "../util/log"
import { createOpencodeClient } from "@opencode-ai/sdk"
import { Server } from "../server/server"
import { BunProc } from "../bun"
import { Instance } from "../project/instance"
import { Flag } from "../flag/flag"
import { CodexAuthPlugin } from "./codex"
import { Session } from "../session"
import { NamedError } from "@opencode-ai/util/error"
import { CopilotAuthPlugin } from "./copilot"
import { gitlabAuthPlugin as GitlabAuthPlugin } from "@gitlab/opencode-gitlab-auth"
import { Effect, Fiber, Layer, ServiceMap } from "effect"
import { InstanceContext } from "@/effect/instance-context"
import { runPromiseInstance } from "@/effect/runtime"
export namespace Plugin {
const log = Log.create({ service: "plugin" })
@@ -21,173 +19,126 @@ export namespace Plugin {
// Built-in plugins that are directly imported (not installed from npm)
const INTERNAL_PLUGINS: PluginInstance[] = [CodexAuthPlugin, CopilotAuthPlugin, GitlabAuthPlugin]
export interface Interface {
readonly trigger: <
Name extends Exclude<keyof Required<Hooks>, "auth" | "event" | "tool">,
Input = Parameters<Required<Hooks>[Name]>[0],
Output = Parameters<Required<Hooks>[Name]>[1],
>(
name: Name,
input: Input,
output: Output,
) => Effect.Effect<Output>
readonly list: () => Effect.Effect<Hooks[]>
readonly init: () => Effect.Effect<void>
}
const state = Instance.state(async () => {
const client = createOpencodeClient({
baseUrl: "http://localhost:4096",
directory: Instance.directory,
headers: Flag.OPENCODE_SERVER_PASSWORD
? {
Authorization: `Basic ${Buffer.from(`${Flag.OPENCODE_SERVER_USERNAME ?? "opencode"}:${Flag.OPENCODE_SERVER_PASSWORD}`).toString("base64")}`,
}
: undefined,
fetch: async (...args) => Server.Default().fetch(...args),
})
const config = await Config.get()
const hooks: Hooks[] = []
const input: PluginInput = {
client,
project: Instance.project,
worktree: Instance.worktree,
directory: Instance.directory,
get serverUrl(): URL {
return Server.url ?? new URL("http://localhost:4096")
},
$: Bun.$,
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Plugin") {}
for (const plugin of INTERNAL_PLUGINS) {
log.info("loading internal plugin", { name: plugin.name })
const init = await plugin(input).catch((err) => {
log.error("failed to load internal plugin", { name: plugin.name, error: err })
})
if (init) hooks.push(init)
}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const instance = yield* InstanceContext
const hooks: Hooks[] = []
let plugins = config.plugin ?? []
if (plugins.length) await Config.waitForDependencies()
const load = Effect.fn("Plugin.load")(function* () {
yield* Effect.promise(async () => {
const client = createOpencodeClient({
baseUrl: "http://localhost:4096",
directory: instance.directory,
headers: Flag.OPENCODE_SERVER_PASSWORD
? {
Authorization: `Basic ${Buffer.from(`${Flag.OPENCODE_SERVER_USERNAME ?? "opencode"}:${Flag.OPENCODE_SERVER_PASSWORD}`).toString("base64")}`,
}
: undefined,
fetch: async (...args) => Server.Default().fetch(...args),
for (let plugin of plugins) {
// ignore old codex plugin since it is supported first party now
if (plugin.includes("opencode-openai-codex-auth") || plugin.includes("opencode-copilot-auth")) continue
log.info("loading plugin", { path: plugin })
if (!plugin.startsWith("file://")) {
const lastAtIndex = plugin.lastIndexOf("@")
const pkg = lastAtIndex > 0 ? plugin.substring(0, lastAtIndex) : plugin
const version = lastAtIndex > 0 ? plugin.substring(lastAtIndex + 1) : "latest"
plugin = await BunProc.install(pkg, version).catch((err) => {
const cause = err instanceof Error ? err.cause : err
const detail = cause instanceof Error ? cause.message : String(cause ?? err)
log.error("failed to install plugin", { pkg, version, error: detail })
Bus.publish(Session.Event.Error, {
error: new NamedError.Unknown({
message: `Failed to install plugin ${pkg}@${version}: ${detail}`,
}).toObject(),
})
const config = await Config.get()
const input: PluginInput = {
client,
project: instance.project,
worktree: instance.worktree,
directory: instance.directory,
get serverUrl(): URL {
return Server.url ?? new URL("http://localhost:4096")
},
$: Bun.$,
}
for (const plugin of INTERNAL_PLUGINS) {
log.info("loading internal plugin", { name: plugin.name })
const init = await plugin(input).catch((err) => {
log.error("failed to load internal plugin", { name: plugin.name, error: err })
})
if (init) hooks.push(init)
}
let plugins = config.plugin ?? []
if (plugins.length) await Config.waitForDependencies()
for (let plugin of plugins) {
// ignore old codex plugin since it is supported first party now
if (plugin.includes("opencode-openai-codex-auth") || plugin.includes("opencode-copilot-auth")) continue
log.info("loading plugin", { path: plugin })
if (!plugin.startsWith("file://")) {
const lastAtIndex = plugin.lastIndexOf("@")
const pkg = lastAtIndex > 0 ? plugin.substring(0, lastAtIndex) : plugin
const version = lastAtIndex > 0 ? plugin.substring(lastAtIndex + 1) : "latest"
plugin = await BunProc.install(pkg, version).catch((err) => {
const cause = err instanceof Error ? err.cause : err
const detail = cause instanceof Error ? cause.message : String(cause ?? err)
log.error("failed to install plugin", { pkg, version, error: detail })
Bus.publish(Session.Event.Error, {
error: new NamedError.Unknown({
message: `Failed to install plugin ${pkg}@${version}: ${detail}`,
}).toObject(),
})
return ""
})
if (!plugin) continue
}
// Prevent duplicate initialization when plugins export the same function
// as both a named export and default export (e.g., `export const X` and `export default X`).
// Object.entries(mod) would return both entries pointing to the same function reference.
await import(plugin)
.then(async (mod) => {
const seen = new Set<PluginInstance>()
for (const [_name, fn] of Object.entries<PluginInstance>(mod)) {
if (seen.has(fn)) continue
seen.add(fn)
hooks.push(await fn(input))
}
})
.catch((err) => {
const message = err instanceof Error ? err.message : String(err)
log.error("failed to load plugin", { path: plugin, error: message })
Bus.publish(Session.Event.Error, {
error: new NamedError.Unknown({
message: `Failed to load plugin ${plugin}: ${message}`,
}).toObject(),
})
})
return ""
})
if (!plugin) continue
}
// Prevent duplicate initialization when plugins export the same function
// as both a named export and default export (e.g., `export const X` and `export default X`).
// Object.entries(mod) would return both entries pointing to the same function reference.
await import(plugin)
.then(async (mod) => {
const seen = new Set<PluginInstance>()
for (const [_name, fn] of Object.entries<PluginInstance>(mod)) {
if (seen.has(fn)) continue
seen.add(fn)
hooks.push(await fn(input))
}
})
})
const loadFiber = yield* load().pipe(
Effect.catchCause((cause) => Effect.sync(() => log.error("init failed", { cause }))),
Effect.forkScoped,
)
const trigger = Effect.fn("Plugin.trigger")(function* <
Name extends Exclude<keyof Required<Hooks>, "auth" | "event" | "tool">,
Input = Parameters<Required<Hooks>[Name]>[0],
Output = Parameters<Required<Hooks>[Name]>[1],
>(name: Name, input: Input, output: Output) {
if (!name) return output
yield* Fiber.join(loadFiber)
yield* Effect.promise(async () => {
for (const hook of hooks) {
const fn = hook[name]
if (!fn) continue
// @ts-expect-error if you feel adventurous, please fix the typing, make sure to bump the try-counter if you
// give up.
// try-counter: 2
await fn(input, output)
}
})
return output
})
const list = Effect.fn("Plugin.list")(function* () {
yield* Fiber.join(loadFiber)
return hooks
})
const init = Effect.fn("Plugin.init")(function* () {
yield* Fiber.join(loadFiber)
yield* Effect.promise(async () => {
const config = await Config.get()
for (const hook of hooks) {
await (hook as any).config?.(config)
}
Bus.subscribeAll(async (input) => {
for (const hook of hooks) {
hook["event"]?.({
event: input,
})
}
.catch((err) => {
const message = err instanceof Error ? err.message : String(err)
log.error("failed to load plugin", { path: plugin, error: message })
Bus.publish(Session.Event.Error, {
error: new NamedError.Unknown({
message: `Failed to load plugin ${plugin}: ${message}`,
}).toObject(),
})
})
})
}
return Service.of({ trigger, list, init })
}),
)
return {
hooks,
input,
}
})
export async function trigger<
Name extends Exclude<keyof Required<Hooks>, "auth" | "event" | "tool">,
Input = Parameters<Required<Hooks>[Name]>[0],
Output = Parameters<Required<Hooks>[Name]>[1],
>(name: Name, input: Input, output: Output): Promise<Output> {
return runPromiseInstance(Service.use((svc) => svc.trigger(name, input, output)))
if (!name) return output
for (const hook of await state().then((x) => x.hooks)) {
const fn = hook[name]
if (!fn) continue
// @ts-expect-error if you feel adventurous, please fix the typing, make sure to bump the try-counter if you
// give up.
// try-counter: 2
await fn(input, output)
}
return output
}
export async function list(): Promise<Hooks[]> {
return runPromiseInstance(Service.use((svc) => svc.list()))
export async function list() {
return state().then((x) => x.hooks)
}
export async function init() {
return runPromiseInstance(Service.use((svc) => svc.init()))
const hooks = await state().then((x) => x.hooks)
const config = await Config.get()
for (const hook of hooks) {
// @ts-expect-error this is because we haven't moved plugin to sdk v2
await hook.config?.(config)
}
Bus.subscribeAll(async (input) => {
const hooks = await state().then((x) => x.hooks)
for (const hook of hooks) {
hook["event"]?.({
event: input,
})
}
})
}
}

View File

@@ -1,7 +1,8 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { Instance } from "@/project/instance"
import { runSyncInstance } from "@/effect/runtime"
import { SessionID } from "./schema"
import { Effect, Layer, ServiceMap } from "effect"
import z from "zod"
export namespace SessionStatus {
@@ -42,36 +43,51 @@ export namespace SessionStatus {
),
}
const state = Instance.state(() => {
const data: Record<string, Info> = {}
return data
})
export function get(sessionID: SessionID) {
return (
state()[sessionID] ?? {
type: "idle",
}
)
export interface Interface {
readonly get: (sessionID: SessionID) => Effect.Effect<Info>
readonly list: () => Effect.Effect<Record<string, Info>>
readonly set: (sessionID: SessionID, status: Info) => Effect.Effect<void>
}
export function list() {
return state()
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SessionStatus") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const data = new Map<SessionID, Info>()
const get = Effect.fn("SessionStatus.get")(function* (sessionID: SessionID) {
return data.get(sessionID) ?? { type: "idle" as const }
})
const list = Effect.fn("SessionStatus.list")(function* () {
return Object.fromEntries(data)
})
const set = Effect.fn("SessionStatus.set")(function* (sessionID: SessionID, status: Info) {
Bus.publish(Event.Status, { sessionID, status })
if (status.type === "idle") {
// deprecated
Bus.publish(Event.Idle, { sessionID })
data.delete(sessionID)
return
}
data.set(sessionID, status)
})
return Service.of({ get, list, set })
}),
)
export function get(sessionID: SessionID): Info {
return runSyncInstance(Service.use((svc) => svc.get(sessionID)))
}
export function list(): Record<string, Info> {
return runSyncInstance(Service.use((svc) => svc.list()))
}
export function set(sessionID: SessionID, status: Info) {
Bus.publish(Event.Status, {
sessionID,
status,
})
if (status.type === "idle") {
// deprecated
Bus.publish(Event.Idle, {
sessionID,
})
delete state()[sessionID]
return
}
state()[sessionID] = status
runSyncInstance(Service.use((svc) => svc.set(sessionID, status)))
}
}

View File

@@ -5,9 +5,9 @@ import path from "path"
import { Deferred, Effect, Option } from "effect"
import { tmpdir } from "../fixture/fixture"
import { watcherConfigLayer, withServices } from "../fixture/instance"
import { Bus } from "../../src/bus"
import { FileWatcher } from "../../src/file/watcher"
import { Instance } from "../../src/project/instance"
import { GlobalBus } from "../../src/bus/global"
// Native @parcel/watcher bindings aren't reliably available in CI (missing on Linux, flaky on Windows)
const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip
@@ -16,6 +16,7 @@ const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? desc
// Helpers
// ---------------------------------------------------------------------------
type BusUpdate = { directory?: string; payload: { type: string; properties: WatcherEvent } }
type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
/** Run `body` with a live FileWatcher service. */
@@ -35,17 +36,22 @@ function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {
let done = false
const unsub = Bus.subscribe(FileWatcher.Event.Updated, (evt) => {
function on(evt: BusUpdate) {
if (done) return
if (!check(evt.properties)) return
hit(evt.properties)
})
if (evt.directory !== directory) return
if (evt.payload.type !== FileWatcher.Event.Updated.type) return
if (!check(evt.payload.properties)) return
hit(evt.payload.properties)
}
return () => {
function cleanup() {
if (done) return
done = true
unsub()
GlobalBus.off("event", on)
}
GlobalBus.on("event", on)
return cleanup
}
function wait(directory: string, check: (evt: WatcherEvent) => boolean) {