Compare commits

..

2 Commits

Author SHA1 Message Date
Kit Langton
8e8ac7abe9 Merge branch 'dev' into kit/effectify-session-status 2026-03-19 21:16:41 -04:00
Kit Langton
3217d112ec effectify SessionStatus service, add runSyncInstance helper 2026-03-19 20:24:49 -04:00
10 changed files with 152 additions and 162 deletions

View File

@@ -123,6 +123,7 @@ Done now:
- [x] `Discovery`
- [x] `File`
- [x] `Snapshot`
- [x] `SessionStatus`
Still open and likely worth migrating:
@@ -132,7 +133,7 @@ Still open and likely worth migrating:
- [ ] `Worktree`
- [ ] `Installation`
- [ ] `Bus`
- [x] `Command`
- [ ] `Command`
- [ ] `Config`
- [ ] `Session`
- [ ] `SessionProcessor`

View File

@@ -1,5 +1,5 @@
import z from "zod"
import type { ZodObject, ZodRawShape } from "zod"
import type { ZodType } from "zod"
import { Log } from "../util/log"
export namespace BusEvent {
@@ -9,7 +9,7 @@ export namespace BusEvent {
const registry = new Map<string, Definition>()
export function define<Type extends string, Properties extends ZodObject<ZodRawShape>>(type: Type, properties: Properties) {
export function define<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
const result = {
type,
properties,

View File

@@ -4,7 +4,7 @@ export const GlobalBus = new EventEmitter<{
event: [
{
directory?: string
payload: { type: string; properties: Record<string, unknown> }
payload: any
},
]
}>()

View File

@@ -1,18 +1,15 @@
import { BusEvent } from "@/bus/bus-event"
import { InstanceContext } from "@/effect/instance-context"
import { runPromiseInstance } from "@/effect/runtime"
import { SessionID, MessageID } from "@/session/schema"
import { Effect, Fiber, Layer, ServiceMap } from "effect"
import z from "zod"
import { Config } from "../config/config"
import { Instance } from "../project/instance"
import { Identifier } from "../id/id"
import PROMPT_INITIALIZE from "./template/initialize.txt"
import PROMPT_REVIEW from "./template/review.txt"
import { MCP } from "../mcp"
import { Skill } from "../skill"
import { Log } from "../util/log"
export namespace Command {
const log = Log.create({ service: "command" })
export const Event = {
Executed: BusEvent.define(
"command.executed",
@@ -60,126 +57,95 @@ export namespace Command {
REVIEW: "review",
} as const
export interface Interface {
readonly get: (name: string) => Effect.Effect<Info | undefined>
readonly list: () => Effect.Effect<Info[]>
}
const state = Instance.state(async () => {
const cfg = await Config.get()
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Command") {}
const result: Record<string, Info> = {
[Default.INIT]: {
name: Default.INIT,
description: "create/update AGENTS.md",
source: "command",
get template() {
return PROMPT_INITIALIZE.replace("${path}", Instance.worktree)
},
hints: hints(PROMPT_INITIALIZE),
},
[Default.REVIEW]: {
name: Default.REVIEW,
description: "review changes [commit|branch|pr], defaults to uncommitted",
source: "command",
get template() {
return PROMPT_REVIEW.replace("${path}", Instance.worktree)
},
subtask: true,
hints: hints(PROMPT_REVIEW),
},
}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const instance = yield* InstanceContext
for (const [name, command] of Object.entries(cfg.command ?? {})) {
result[name] = {
name,
agent: command.agent,
model: command.model,
description: command.description,
source: "command",
get template() {
return command.template
},
subtask: command.subtask,
hints: hints(command.template),
}
}
for (const [name, prompt] of Object.entries(await MCP.prompts())) {
result[name] = {
name,
source: "mcp",
description: prompt.description,
get template() {
// since a getter can't be async we need to manually return a promise here
return new Promise<string>(async (resolve, reject) => {
const template = await MCP.getPrompt(
prompt.client,
prompt.name,
prompt.arguments
? // substitute each argument with $1, $2, etc.
Object.fromEntries(prompt.arguments?.map((argument, i) => [argument.name, `$${i + 1}`]))
: {},
).catch(reject)
resolve(
template?.messages
.map((message) => (message.content.type === "text" ? message.content.text : ""))
.join("\n") || "",
)
})
},
hints: prompt.arguments?.map((_, i) => `$${i + 1}`) ?? [],
}
}
const commands: Record<string, Info> = {}
// Add skills as invokable commands
for (const skill of await Skill.all()) {
// Skip if a command with this name already exists
if (result[skill.name]) continue
result[skill.name] = {
name: skill.name,
description: skill.description,
source: "skill",
get template() {
return skill.content
},
hints: [],
}
}
const load = Effect.fn("Command.load")(function* () {
yield* Effect.promise(async () => {
const cfg = await Config.get()
commands[Default.INIT] = {
name: Default.INIT,
description: "create/update AGENTS.md",
source: "command",
get template() {
return PROMPT_INITIALIZE.replace("${path}", instance.worktree)
},
hints: hints(PROMPT_INITIALIZE),
}
commands[Default.REVIEW] = {
name: Default.REVIEW,
description: "review changes [commit|branch|pr], defaults to uncommitted",
source: "command",
get template() {
return PROMPT_REVIEW.replace("${path}", instance.worktree)
},
subtask: true,
hints: hints(PROMPT_REVIEW),
}
for (const [name, command] of Object.entries(cfg.command ?? {})) {
commands[name] = {
name,
agent: command.agent,
model: command.model,
description: command.description,
source: "command",
get template() {
return command.template
},
subtask: command.subtask,
hints: hints(command.template),
}
}
for (const [name, prompt] of Object.entries(await MCP.prompts())) {
commands[name] = {
name,
source: "mcp",
description: prompt.description,
get template() {
// since a getter can't be async we need to manually return a promise here
return new Promise<string>(async (resolve, reject) => {
const template = await MCP.getPrompt(
prompt.client,
prompt.name,
prompt.arguments
? // substitute each argument with $1, $2, etc.
Object.fromEntries(prompt.arguments?.map((argument, i) => [argument.name, `$${i + 1}`]))
: {},
).catch(reject)
resolve(
template?.messages
.map((message) => (message.content.type === "text" ? message.content.text : ""))
.join("\n") || "",
)
})
},
hints: prompt.arguments?.map((_, i) => `$${i + 1}`) ?? [],
}
}
// Add skills as invokable commands
for (const skill of await Skill.all()) {
// Skip if a command with this name already exists
if (commands[skill.name]) continue
commands[skill.name] = {
name: skill.name,
description: skill.description,
source: "skill",
get template() {
return skill.content
},
hints: [],
}
}
})
})
const loadFiber = yield* load().pipe(
Effect.catchCause((cause) => Effect.sync(() => log.error("init failed", { cause }))),
Effect.forkScoped,
)
const get = Effect.fn("Command.get")(function* (name: string) {
yield* Fiber.join(loadFiber)
return commands[name]
})
const list = Effect.fn("Command.list")(function* () {
yield* Fiber.join(loadFiber)
return Object.values(commands)
})
return Service.of({ get, list })
}),
)
return result
})
export async function get(name: string) {
return runPromiseInstance(Service.use((svc) => svc.get(name)))
return state().then((x) => x[name])
}
export async function list() {
return runPromiseInstance(Service.use((svc) => svc.list()))
return state().then((x) => Object.values(x))
}
}

View File

@@ -124,7 +124,7 @@ export namespace Workspace {
await parseSSE(res.body, stop, (event) => {
GlobalBus.emit("event", {
directory: space.id,
payload: event as { type: string; properties: Record<string, unknown> },
payload: event,
})
})
// Wait 250ms and retry if SSE connection fails

View File

@@ -1,5 +1,4 @@
import { Effect, Layer, LayerMap, ServiceMap } from "effect"
import { Command } from "@/command"
import { File } from "@/file"
import { FileTime } from "@/file/time"
import { FileWatcher } from "@/file/watcher"
@@ -9,6 +8,7 @@ import { Instance } from "@/project/instance"
import { Vcs } from "@/project/vcs"
import { ProviderAuth } from "@/provider/auth"
import { Question } from "@/question"
import { SessionStatus } from "@/session/status"
import { Skill } from "@/skill/skill"
import { Snapshot } from "@/snapshot"
import { InstanceContext } from "./instance-context"
@@ -17,7 +17,6 @@ import { registerDisposer } from "./instance-registry"
export { InstanceContext } from "./instance-context"
export type InstanceServices =
| Command.Service
| Question.Service
| PermissionNext.Service
| ProviderAuth.Service
@@ -26,6 +25,7 @@ export type InstanceServices =
| FileTime.Service
| Format.Service
| File.Service
| SessionStatus.Service
| Skill.Service
| Snapshot.Service
@@ -38,7 +38,6 @@ export type InstanceServices =
function lookup(_key: string) {
const ctx = Layer.sync(InstanceContext, () => InstanceContext.of(Instance.current))
return Layer.mergeAll(
Layer.fresh(Command.layer),
Layer.fresh(Question.layer),
Layer.fresh(PermissionNext.layer),
Layer.fresh(ProviderAuth.defaultLayer),
@@ -47,6 +46,7 @@ function lookup(_key: string) {
Layer.fresh(FileTime.layer).pipe(Layer.orDie),
Layer.fresh(Format.layer),
Layer.fresh(File.layer),
Layer.fresh(SessionStatus.layer),
Layer.fresh(Skill.defaultLayer),
Layer.fresh(Snapshot.defaultLayer),
).pipe(Layer.provide(ctx))

View File

@@ -18,6 +18,10 @@ export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceSer
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function runSyncInstance<A, E>(effect: Effect.Effect<A, E, InstanceServices>) {
return runtime.runSync(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function disposeRuntime() {
return runtime.dispose()
}

View File

@@ -1781,9 +1781,6 @@ NOTE: At any point in time through this workflow you should feel free to ask the
export async function command(input: CommandInput) {
log.info("command", input)
const command = await Command.get(input.command)
if (!command) {
throw new NamedError.Unknown({ message: `Command not found: "${input.command}"` })
}
const agentName = command.agent ?? input.agent ?? (await Agent.defaultAgent())
const raw = input.arguments.match(argsRegex) ?? []

View File

@@ -1,7 +1,8 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { Instance } from "@/project/instance"
import { runSyncInstance } from "@/effect/runtime"
import { SessionID } from "./schema"
import { Effect, Layer, ServiceMap } from "effect"
import z from "zod"
export namespace SessionStatus {
@@ -42,36 +43,51 @@ export namespace SessionStatus {
),
}
const state = Instance.state(() => {
const data: Record<string, Info> = {}
return data
})
export function get(sessionID: SessionID) {
return (
state()[sessionID] ?? {
type: "idle",
}
)
export interface Interface {
readonly get: (sessionID: SessionID) => Effect.Effect<Info>
readonly list: () => Effect.Effect<Record<string, Info>>
readonly set: (sessionID: SessionID, status: Info) => Effect.Effect<void>
}
export function list() {
return state()
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SessionStatus") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const data = new Map<SessionID, Info>()
const get = Effect.fn("SessionStatus.get")(function* (sessionID: SessionID) {
return data.get(sessionID) ?? { type: "idle" as const }
})
const list = Effect.fn("SessionStatus.list")(function* () {
return Object.fromEntries(data)
})
const set = Effect.fn("SessionStatus.set")(function* (sessionID: SessionID, status: Info) {
Bus.publish(Event.Status, { sessionID, status })
if (status.type === "idle") {
// deprecated
Bus.publish(Event.Idle, { sessionID })
data.delete(sessionID)
return
}
data.set(sessionID, status)
})
return Service.of({ get, list, set })
}),
)
export function get(sessionID: SessionID): Info {
return runSyncInstance(Service.use((svc) => svc.get(sessionID)))
}
export function list(): Record<string, Info> {
return runSyncInstance(Service.use((svc) => svc.list()))
}
export function set(sessionID: SessionID, status: Info) {
Bus.publish(Event.Status, {
sessionID,
status,
})
if (status.type === "idle") {
// deprecated
Bus.publish(Event.Idle, {
sessionID,
})
delete state()[sessionID]
return
}
state()[sessionID] = status
runSyncInstance(Service.use((svc) => svc.set(sessionID, status)))
}
}

View File

@@ -5,9 +5,9 @@ import path from "path"
import { Deferred, Effect, Option } from "effect"
import { tmpdir } from "../fixture/fixture"
import { watcherConfigLayer, withServices } from "../fixture/instance"
import { Bus } from "../../src/bus"
import { FileWatcher } from "../../src/file/watcher"
import { Instance } from "../../src/project/instance"
import { GlobalBus } from "../../src/bus/global"
// Native @parcel/watcher bindings aren't reliably available in CI (missing on Linux, flaky on Windows)
const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip
@@ -16,6 +16,7 @@ const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? desc
// Helpers
// ---------------------------------------------------------------------------
type BusUpdate = { directory?: string; payload: { type: string; properties: WatcherEvent } }
type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
/** Run `body` with a live FileWatcher service. */
@@ -35,17 +36,22 @@ function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {
let done = false
const unsub = Bus.subscribe(FileWatcher.Event.Updated, (evt) => {
function on(evt: BusUpdate) {
if (done) return
if (!check(evt.properties)) return
hit(evt.properties)
})
if (evt.directory !== directory) return
if (evt.payload.type !== FileWatcher.Event.Updated.type) return
if (!check(evt.payload.properties)) return
hit(evt.payload.properties)
}
return () => {
function cleanup() {
if (done) return
done = true
unsub()
GlobalBus.off("event", on)
}
GlobalBus.on("event", on)
return cleanup
}
function wait(directory: string, check: (evt: WatcherEvent) => boolean) {