Compare commits

...

3 Commits

Author SHA1 Message Date
Kit Langton
8c566cb764 refactor(vcs): effectify VcsService as scoped service
Convert Vcs from Instance.state namespace to an Effect ServiceMap.Service
on the Instances LayerMap. Uses Instance.bind for the Bus.subscribe
callback (same ALS pattern as FileWatcherService). Branch state is
managed in the layer closure with scope-based cleanup.
2026-03-15 14:05:57 -04:00
Kit Langton
388c173b56 fix(vcs): invert HEAD filter so branch changes are detected
The condition was `endsWith("HEAD")` (skip HEAD events) instead of
`!endsWith("HEAD")` (only process HEAD events). Since the git watcher
only watches HEAD, this meant BranchUpdated never fired.
2026-03-15 13:42:20 -04:00
Kit Langton
3984bd061e test(vcs): add tests for branch detection and HEAD event publishing
Two tests pass (branch() reads current branch correctly).
Two tests fail: BranchUpdated never fires because the FileWatcher
event filter in vcs.ts has an inverted condition — it skips HEAD
changes instead of filtering for them.
2026-03-15 13:39:55 -04:00
10 changed files with 259 additions and 89 deletions

View File

@@ -0,0 +1,13 @@
import { ServiceMap } from "effect"
import type { Project } from "@/project/project"
export declare namespace InstanceContext {
export interface Shape {
readonly directory: string
readonly project: Project.Info
}
}
export class InstanceContext extends ServiceMap.Service<InstanceContext, InstanceContext.Shape>()(
"opencode/InstanceContext",
) {}

View File

@@ -1,24 +1,21 @@
import { Effect, Layer, LayerMap, ServiceMap } from "effect"
import { registerDisposer } from "./instance-registry"
import { InstanceContext } from "./instance-context"
import { ProviderAuthService } from "@/provider/auth-service"
import { QuestionService } from "@/question/service"
import { PermissionService } from "@/permission/service"
import { FileWatcherService } from "@/file/watcher"
import { VcsService } from "@/project/vcs"
import { Instance } from "@/project/instance"
import type { Project } from "@/project/project"
export declare namespace InstanceContext {
export interface Shape {
readonly directory: string
readonly project: Project.Info
}
}
export { InstanceContext } from "./instance-context"
export class InstanceContext extends ServiceMap.Service<InstanceContext, InstanceContext.Shape>()(
"opencode/InstanceContext",
) {}
export type InstanceServices = QuestionService | PermissionService | ProviderAuthService | FileWatcherService
export type InstanceServices =
| QuestionService
| PermissionService
| ProviderAuthService
| FileWatcherService
| VcsService
function lookup(directory: string) {
const project = Instance.project
@@ -28,6 +25,7 @@ function lookup(directory: string) {
Layer.fresh(PermissionService.layer),
Layer.fresh(ProviderAuthService.layer),
Layer.fresh(FileWatcherService.layer),
Layer.fresh(VcsService.layer),
).pipe(Layer.provide(ctx))
}

View File

@@ -1,6 +1,6 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { InstanceContext } from "@/effect/instances"
import { InstanceContext } from "@/effect/instance-context"
import { Instance } from "@/project/instance"
import z from "zod"
import { Log } from "../util/log"

View File

@@ -1,6 +1,6 @@
import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { InstanceContext } from "@/effect/instances"
import { InstanceContext } from "@/effect/instance-context"
import { ProjectID } from "@/project/schema"
import { MessageID, SessionID } from "@/session/schema"
import { PermissionTable } from "@/session/session.sql"

View File

@@ -7,7 +7,7 @@ import { Project } from "./project"
import { Bus } from "../bus"
import { Command } from "../command"
import { Instance } from "./instance"
import { Vcs } from "./vcs"
import { VcsService } from "./vcs"
import { Log } from "@/util/log"
import { ShareNext } from "@/share/share-next"
import { Snapshot } from "../snapshot"
@@ -22,7 +22,7 @@ export async function InstanceBootstrap() {
await LSP.init()
await runPromiseInstance(FileWatcherService.use((service) => service.init()))
File.init()
Vcs.init()
await runPromiseInstance(VcsService.use((s) => s.init()))
Snapshot.init()
Truncate.init()

View File

@@ -1,11 +1,12 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import path from "path"
import z from "zod"
import { Log } from "@/util/log"
import { Instance } from "./instance"
import { InstanceContext } from "@/effect/instance-context"
import { FileWatcher } from "@/file/watcher"
import { git } from "@/util/git"
import { Effect, Layer, ServiceMap } from "effect"
const log = Log.create({ service: "vcs" })
@@ -27,50 +28,57 @@ export namespace Vcs {
ref: "VcsInfo",
})
export type Info = z.infer<typeof Info>
}
async function currentBranch() {
const result = await git(["rev-parse", "--abbrev-ref", "HEAD"], {
cwd: Instance.worktree,
})
if (result.exitCode !== 0) return
const text = result.text().trim()
if (!text) return
return text
}
const state = Instance.state(
async () => {
if (Instance.project.vcs !== "git") {
return { branch: async () => undefined, unsubscribe: undefined }
}
let current = await currentBranch()
log.info("initialized", { branch: current })
const unsubscribe = Bus.subscribe(FileWatcher.Event.Updated, async (evt) => {
if (evt.properties.file.endsWith("HEAD")) return
const next = await currentBranch()
if (next !== current) {
log.info("branch changed", { from: current, to: next })
current = next
Bus.publish(Event.BranchUpdated, { branch: next })
}
})
return {
branch: async () => current,
unsubscribe,
}
},
async (state) => {
state.unsubscribe?.()
},
)
export async function init() {
return state()
}
export async function branch() {
return await state().then((s) => s.branch())
export namespace VcsService {
export interface Service {
readonly init: () => Effect.Effect<void>
readonly branch: () => Effect.Effect<string | undefined>
}
}
export class VcsService extends ServiceMap.Service<VcsService, VcsService.Service>()("@opencode/Vcs") {
static readonly layer = Layer.effect(
VcsService,
Effect.gen(function* () {
const instance = yield* InstanceContext
let current: string | undefined
if (instance.project.vcs === "git") {
const currentBranch = async () => {
const result = await git(["rev-parse", "--abbrev-ref", "HEAD"], {
cwd: instance.project.worktree,
})
if (result.exitCode !== 0) return undefined
const text = result.text().trim()
return text || undefined
}
current = yield* Effect.promise(() => currentBranch())
log.info("initialized", { branch: current })
const unsubscribe = Bus.subscribe(
FileWatcher.Event.Updated,
Instance.bind(async (evt) => {
if (!evt.properties.file.endsWith("HEAD")) return
const next = await currentBranch()
if (next !== current) {
log.info("branch changed", { from: current, to: next })
current = next
Bus.publish(Vcs.Event.BranchUpdated, { branch: next })
}
}),
)
yield* Effect.addFinalizer(() => Effect.sync(unsubscribe))
}
return VcsService.of({
init: Effect.fn("VcsService.init")(function* () {}),
branch: Effect.fn("VcsService.branch")(function* () {
return current
}),
})
}),
)
}

View File

@@ -14,7 +14,8 @@ import { LSP } from "../lsp"
import { Format } from "../format"
import { TuiRoutes } from "./routes/tui"
import { Instance } from "../project/instance"
import { Vcs } from "../project/vcs"
import { Vcs, VcsService } from "../project/vcs"
import { runPromiseInstance } from "@/effect/runtime"
import { Agent } from "../agent/agent"
import { Skill } from "../skill/skill"
import { Auth } from "../auth"
@@ -330,7 +331,7 @@ export namespace Server {
},
}),
async (c) => {
const branch = await Vcs.branch()
const branch = await runPromiseInstance(VcsService.use((s) => s.branch()))
return c.json({
branch,
})

View File

@@ -2,10 +2,10 @@ import { $ } from "bun"
import { afterEach, describe, expect, test } from "bun:test"
import fs from "fs/promises"
import path from "path"
import { ConfigProvider, Deferred, Effect, Fiber, Layer, ManagedRuntime, Option } from "effect"
import { Deferred, Effect, Fiber, Option } from "effect"
import { tmpdir } from "../fixture/fixture"
import { watcherConfigLayer, withServices } from "../fixture/instance"
import { FileWatcher, FileWatcherService } from "../../src/file/watcher"
import { InstanceContext } from "../../src/effect/instances"
import { Instance } from "../../src/project/instance"
import { GlobalBus } from "../../src/bus/global"
@@ -16,35 +16,21 @@ const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? desc
// Helpers
// ---------------------------------------------------------------------------
const configLayer = ConfigProvider.layer(
ConfigProvider.fromUnknown({
OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
}),
)
type BusUpdate = { directory?: string; payload: { type: string; properties: WatcherEvent } }
type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
/** Run `body` with a live FileWatcherService. Runtime is acquired/released via Effect.scoped. */
/** Run `body` with a live FileWatcherService. */
function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
return Instance.provide({
return withServices(
directory,
fn: () =>
Effect.gen(function* () {
const ctx = Layer.sync(InstanceContext, () =>
InstanceContext.of({ directory: Instance.directory, project: Instance.project }),
)
const layer = Layer.fresh(FileWatcherService.layer).pipe(Layer.provide(ctx), Layer.provide(configLayer))
const rt = yield* Effect.acquireRelease(
Effect.sync(() => ManagedRuntime.make(layer)),
(rt) => Effect.promise(() => rt.dispose()),
)
yield* Effect.promise(() => rt.runPromise(FileWatcherService.use((s) => s.init())))
yield* ready(directory)
yield* body
}).pipe(Effect.scoped, Effect.runPromise),
})
FileWatcherService.layer,
async (rt) => {
await rt.runPromise(FileWatcherService.use((s) => s.init()))
await Effect.runPromise(ready(directory))
await Effect.runPromise(body)
},
{ provide: [watcherConfigLayer] },
)
}
function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {

View File

@@ -0,0 +1,47 @@
import { ConfigProvider, Layer, ManagedRuntime } from "effect"
import { InstanceContext } from "../../src/effect/instance-context"
import { Instance } from "../../src/project/instance"
/** ConfigProvider that enables the experimental file watcher. */
export const watcherConfigLayer = ConfigProvider.layer(
ConfigProvider.fromUnknown({
OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
}),
)
/**
* Boot an Instance with the given service layers and run `body` with
* the ManagedRuntime. Cleanup is automatic — the runtime is disposed
* and Instance context is torn down when `body` completes.
*
* Layers may depend on InstanceContext (provided automatically).
* Pass extra layers via `options.provide` (e.g. ConfigProvider.layer).
*/
export function withServices<S>(
directory: string,
layer: Layer.Layer<S, any, InstanceContext>,
body: (rt: ManagedRuntime.ManagedRuntime<S, never>) => Promise<void>,
options?: { provide?: Layer.Layer<never>[] },
) {
return Instance.provide({
directory,
fn: async () => {
const ctx = Layer.sync(InstanceContext, () =>
InstanceContext.of({ directory: Instance.directory, project: Instance.project }),
)
let resolved: Layer.Layer<S> = Layer.fresh(layer).pipe(Layer.provide(ctx)) as any
if (options?.provide) {
for (const l of options.provide) {
resolved = resolved.pipe(Layer.provide(l)) as any
}
}
const rt = ManagedRuntime.make(resolved)
try {
await body(rt)
} finally {
await rt.dispose()
}
},
})
}

View File

@@ -0,0 +1,117 @@
import { $ } from "bun"
import { afterEach, describe, expect, test } from "bun:test"
import fs from "fs/promises"
import path from "path"
import { Layer, ManagedRuntime } from "effect"
import { tmpdir } from "../fixture/fixture"
import { watcherConfigLayer, withServices } from "../fixture/instance"
import { FileWatcher, FileWatcherService } from "../../src/file/watcher"
import { Instance } from "../../src/project/instance"
import { GlobalBus } from "../../src/bus/global"
import { Vcs, VcsService } from "../../src/project/vcs"
// Skip in CI — native @parcel/watcher binding needed
const describeVcs = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function withVcs(
directory: string,
body: (rt: ManagedRuntime.ManagedRuntime<FileWatcherService | VcsService, never>) => Promise<void>,
) {
return withServices(
directory,
Layer.merge(FileWatcherService.layer, VcsService.layer),
async (rt) => {
await rt.runPromise(FileWatcherService.use((s) => s.init()))
await rt.runPromise(VcsService.use((s) => s.init()))
await Bun.sleep(200)
await body(rt)
},
{ provide: [watcherConfigLayer] },
)
}
type BranchEvent = { directory?: string; payload: { type: string; properties: { branch?: string } } }
/** Wait for a Vcs.Event.BranchUpdated event on GlobalBus */
function nextBranchUpdate(directory: string, timeout = 5000) {
return new Promise<string | undefined>((resolve, reject) => {
const timer = setTimeout(() => {
GlobalBus.off("event", on)
reject(new Error("timed out waiting for BranchUpdated event"))
}, timeout)
function on(evt: BranchEvent) {
if (evt.directory !== directory) return
if (evt.payload.type !== Vcs.Event.BranchUpdated.type) return
clearTimeout(timer)
GlobalBus.off("event", on)
resolve(evt.payload.properties.branch)
}
GlobalBus.on("event", on)
})
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describeVcs("Vcs", () => {
afterEach(() => Instance.disposeAll())
test("branch() returns current branch name", async () => {
await using tmp = await tmpdir({ git: true })
await withVcs(tmp.path, async (rt) => {
const branch = await rt.runPromise(VcsService.use((s) => s.branch()))
expect(branch).toBeDefined()
expect(typeof branch).toBe("string")
})
})
test("branch() returns undefined for non-git directories", async () => {
await using tmp = await tmpdir()
await withVcs(tmp.path, async (rt) => {
const branch = await rt.runPromise(VcsService.use((s) => s.branch()))
expect(branch).toBeUndefined()
})
})
test("publishes BranchUpdated when .git/HEAD changes", async () => {
await using tmp = await tmpdir({ git: true })
const branch = `test-${Math.random().toString(36).slice(2)}`
await $`git branch ${branch}`.cwd(tmp.path).quiet()
await withVcs(tmp.path, async () => {
const pending = nextBranchUpdate(tmp.path)
const head = path.join(tmp.path, ".git", "HEAD")
await fs.writeFile(head, `ref: refs/heads/${branch}\n`)
const updated = await pending
expect(updated).toBe(branch)
})
})
test("branch() reflects the new branch after HEAD change", async () => {
await using tmp = await tmpdir({ git: true })
const branch = `test-${Math.random().toString(36).slice(2)}`
await $`git branch ${branch}`.cwd(tmp.path).quiet()
await withVcs(tmp.path, async (rt) => {
const pending = nextBranchUpdate(tmp.path)
const head = path.join(tmp.path, ".git", "HEAD")
await fs.writeFile(head, `ref: refs/heads/${branch}\n`)
await pending
const current = await rt.runPromise(VcsService.use((s) => s.branch()))
expect(current).toBe(branch)
})
})
})