mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-15 18:34:48 +00:00
Compare commits
5 Commits
fix/otel-c
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ba4799b3e | ||
|
|
9640d889ba | ||
|
|
4ae7c77f8a | ||
|
|
f1751401aa | ||
|
|
f06d82b6e8 |
3
bun.lock
3
bun.lock
@@ -512,6 +512,7 @@
|
||||
"effect": "catalog:",
|
||||
},
|
||||
"devDependencies": {
|
||||
"@typescript/native-preview": "catalog:",
|
||||
"typescript": "catalog:",
|
||||
},
|
||||
},
|
||||
@@ -528,9 +529,11 @@
|
||||
"mime-types": "3.0.2",
|
||||
"minimatch": "10.2.5",
|
||||
"semver": "catalog:",
|
||||
"xdg-basedir": "5.1.0",
|
||||
"zod": "catalog:",
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/bun": "catalog:",
|
||||
"@types/semver": "catalog:",
|
||||
},
|
||||
},
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
{
|
||||
"nodeModules": {
|
||||
"x86_64-linux": "sha256-gdS7MkWGeVO0qLs0HKD156YE0uCk5vWeYjKu4JR1Apw=",
|
||||
"aarch64-linux": "sha256-tF4pyVqzbrvdkRG23Fot37FCg8guRZkcU738fHPr/OQ=",
|
||||
"aarch64-darwin": "sha256-FugTWzGMb2ktAbNwQvWRM3GWOb5RTR++8EocDDrQMLc=",
|
||||
"x86_64-darwin": "sha256-jpe6EiwKr+CS00cn0eHwcDluO4LvO3t/5l/LcFBBKP0="
|
||||
"x86_64-linux": "sha256-PvIx2g1J5QIUIzkz2ABaAM4K/k/+xlBPDUExoOJNNuo=",
|
||||
"aarch64-linux": "sha256-YTAL+P13L5hgNJdDSiBED/UNa5zdTntnUUYDYL+Jdzo=",
|
||||
"aarch64-darwin": "sha256-y2VCJifYAp+H0lpDcJ0QfKNMG00Q/usFElaUIpdc8Vs=",
|
||||
"x86_64-darwin": "sha256-yz8edIlqLp06Y95ad8YjKz5azP7YATPle4TcDx6lM+U="
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ import {
|
||||
import { Log } from "../util/log"
|
||||
import { pathToFileURL } from "url"
|
||||
import { Filesystem } from "../util/filesystem"
|
||||
import { Hash } from "../util/hash"
|
||||
import { Hash } from "@opencode-ai/shared/util/hash"
|
||||
import { ACPSessionManager } from "./session"
|
||||
import type { ACPConfig } from "./types"
|
||||
import { Provider } from "../provider/provider"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import z from "zod"
|
||||
import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import { Log } from "../util/log"
|
||||
import { BusEvent } from "./bus-event"
|
||||
import { GlobalBus } from "./global"
|
||||
@@ -128,6 +128,7 @@ export namespace Bus {
|
||||
function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
|
||||
return Effect.gen(function* () {
|
||||
log.info("subscribing", { type })
|
||||
const bridge = yield* EffectBridge.make()
|
||||
const scope = yield* Scope.make()
|
||||
const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
|
||||
|
||||
@@ -147,7 +148,7 @@ export namespace Bus {
|
||||
|
||||
return () => {
|
||||
log.info("unsubscribing", { type })
|
||||
Effect.runFork(Scope.close(scope, Exit.void).pipe(Effect.provide(EffectLogger.layer)))
|
||||
bridge.fork(Scope.close(scope, Exit.void))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ import { hasTheme, upsertTheme } from "../context/theme"
|
||||
import { Global } from "@/global"
|
||||
import { Filesystem } from "@/util/filesystem"
|
||||
import { Process } from "@/util/process"
|
||||
import { Flock } from "@/util/flock"
|
||||
import { Flock } from "@opencode-ai/shared/util/flock"
|
||||
import { Flag } from "@/flag/flag"
|
||||
import { INTERNAL_TUI_PLUGINS, type InternalTuiPlugin } from "./internal"
|
||||
import { setupSlots, Slot as View } from "./slots"
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import type { InstanceContext } from "@/project/instance"
|
||||
import { SessionID, MessageID } from "@/session/schema"
|
||||
import { Effect, Layer, Context } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import z from "zod"
|
||||
import { Config } from "../config/config"
|
||||
import { MCP } from "../mcp"
|
||||
@@ -82,6 +82,7 @@ export namespace Command {
|
||||
|
||||
const init = Effect.fn("Command.state")(function* (ctx: InstanceContext) {
|
||||
const cfg = yield* config.get()
|
||||
const bridge = yield* EffectBridge.make()
|
||||
const commands: Record<string, Info> = {}
|
||||
|
||||
commands[Default.INIT] = {
|
||||
@@ -125,7 +126,7 @@ export namespace Command {
|
||||
source: "mcp",
|
||||
description: prompt.description,
|
||||
get template() {
|
||||
return Effect.runPromise(
|
||||
return bridge.promise(
|
||||
mcp
|
||||
.getPrompt(
|
||||
prompt.client,
|
||||
@@ -141,7 +142,6 @@ export namespace Command {
|
||||
.map((message) => (message.content.type === "text" ? message.content.text : ""))
|
||||
.join("\n") || "",
|
||||
),
|
||||
Effect.provide(EffectLogger.layer),
|
||||
),
|
||||
)
|
||||
},
|
||||
|
||||
@@ -34,7 +34,7 @@ import type { ConsoleState } from "./console-state"
|
||||
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { Context, Duration, Effect, Exit, Fiber, Layer, Option } from "effect"
|
||||
import { Flock } from "@/util/flock"
|
||||
import { Flock } from "@opencode-ai/shared/util/flock"
|
||||
import { isPathPluginSpec, parsePluginSpecifier, resolvePathPluginTarget } from "@/plugin/shared"
|
||||
import { Npm } from "../npm"
|
||||
import { InstanceRef } from "@/effect/instance-ref"
|
||||
|
||||
@@ -12,6 +12,10 @@ export const WorkspaceContext = {
|
||||
return context.provide({ workspaceID: input.workspaceID as string }, () => input.fn())
|
||||
},
|
||||
|
||||
restore<R>(workspaceID: string, fn: () => R): R {
|
||||
return context.provide({ workspaceID }, fn)
|
||||
},
|
||||
|
||||
get workspaceID() {
|
||||
try {
|
||||
return context.use().workspaceID
|
||||
|
||||
49
packages/opencode/src/effect/bridge.ts
Normal file
49
packages/opencode/src/effect/bridge.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
import { Effect, Fiber } from "effect"
|
||||
import { WorkspaceContext } from "@/control-plane/workspace-context"
|
||||
import { Instance, type InstanceContext } from "@/project/instance"
|
||||
import { LocalContext } from "@/util/local-context"
|
||||
import { InstanceRef, WorkspaceRef } from "./instance-ref"
|
||||
import { attachWith } from "./run-service"
|
||||
|
||||
export namespace EffectBridge {
|
||||
export interface Shape {
|
||||
readonly promise: <A, E, R>(effect: Effect.Effect<A, E, R>) => Promise<A>
|
||||
readonly fork: <A, E, R>(effect: Effect.Effect<A, E, R>) => Fiber.Fiber<A, E>
|
||||
}
|
||||
|
||||
function restore<R>(instance: InstanceContext | undefined, workspace: string | undefined, fn: () => R): R {
|
||||
if (instance && workspace !== undefined) {
|
||||
return WorkspaceContext.restore(workspace, () => Instance.restore(instance, fn))
|
||||
}
|
||||
if (instance) return Instance.restore(instance, fn)
|
||||
if (workspace !== undefined) return WorkspaceContext.restore(workspace, fn)
|
||||
return fn()
|
||||
}
|
||||
|
||||
export function make(): Effect.Effect<Shape> {
|
||||
return Effect.gen(function* () {
|
||||
const ctx = yield* Effect.context()
|
||||
const value = yield* InstanceRef
|
||||
const instance =
|
||||
value ??
|
||||
(() => {
|
||||
try {
|
||||
return Instance.current
|
||||
} catch (err) {
|
||||
if (!(err instanceof LocalContext.NotFound)) throw err
|
||||
}
|
||||
})()
|
||||
const workspace = (yield* WorkspaceRef) ?? WorkspaceContext.workspaceID
|
||||
const attach = <A, E, R>(effect: Effect.Effect<A, E, R>) => attachWith(effect, { instance, workspace })
|
||||
const wrap = <A, E, R>(effect: Effect.Effect<A, E, R>) =>
|
||||
attach(effect).pipe(Effect.provide(ctx)) as Effect.Effect<A, E, never>
|
||||
|
||||
return {
|
||||
promise: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
|
||||
restore(instance, workspace, () => Effect.runPromise(wrap(effect))),
|
||||
fork: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
|
||||
restore(instance, workspace, () => Effect.runFork(wrap(effect))),
|
||||
} satisfies Shape
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -5,14 +5,31 @@ import { LocalContext } from "@/util/local-context"
|
||||
import { InstanceRef, WorkspaceRef } from "./instance-ref"
|
||||
import { Observability } from "./observability"
|
||||
import { WorkspaceContext } from "@/control-plane/workspace-context"
|
||||
import type { InstanceContext } from "@/project/instance"
|
||||
|
||||
export const memoMap = Layer.makeMemoMapUnsafe()
|
||||
|
||||
type Refs = {
|
||||
instance?: InstanceContext
|
||||
workspace?: string
|
||||
}
|
||||
|
||||
export function attachWith<A, E, R>(effect: Effect.Effect<A, E, R>, refs: Refs): Effect.Effect<A, E, R> {
|
||||
if (!refs.instance && !refs.workspace) return effect
|
||||
if (!refs.instance) return effect.pipe(Effect.provideService(WorkspaceRef, refs.workspace))
|
||||
if (!refs.workspace) return effect.pipe(Effect.provideService(InstanceRef, refs.instance))
|
||||
return effect.pipe(
|
||||
Effect.provideService(InstanceRef, refs.instance),
|
||||
Effect.provideService(WorkspaceRef, refs.workspace),
|
||||
)
|
||||
}
|
||||
|
||||
export function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
|
||||
try {
|
||||
const ctx = Instance.current
|
||||
const workspaceID = WorkspaceContext.workspaceID
|
||||
return effect.pipe(Effect.provideService(InstanceRef, ctx), Effect.provideService(WorkspaceRef, workspaceID))
|
||||
return attachWith(effect, {
|
||||
instance: Instance.current,
|
||||
workspace: WorkspaceContext.workspaceID,
|
||||
})
|
||||
} catch (err) {
|
||||
if (!(err instanceof LocalContext.NotFound)) throw err
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ import { xdgData, xdgCache, xdgConfig, xdgState } from "xdg-basedir"
|
||||
import path from "path"
|
||||
import os from "os"
|
||||
import { Filesystem } from "../util/filesystem"
|
||||
import { Flock } from "@opencode-ai/shared/util/flock"
|
||||
|
||||
const app = "opencode"
|
||||
|
||||
@@ -26,6 +27,9 @@ export namespace Global {
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize Flock with global state path
|
||||
Flock.setGlobal({ state })
|
||||
|
||||
await Promise.all([
|
||||
fs.mkdir(Global.Path.data, { recursive: true }),
|
||||
fs.mkdir(Global.Path.config, { recursive: true }),
|
||||
|
||||
@@ -25,7 +25,7 @@ import { Bus } from "@/bus"
|
||||
import { TuiEvent } from "@/cli/cmd/tui/event"
|
||||
import open from "open"
|
||||
import { Effect, Exit, Layer, Option, Context, Stream } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
||||
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
||||
@@ -471,25 +471,24 @@ export namespace MCP {
|
||||
Effect.catch(() => Effect.succeed([] as number[])),
|
||||
)
|
||||
|
||||
function watch(s: State, name: string, client: MCPClient, timeout?: number) {
|
||||
function watch(s: State, name: string, client: MCPClient, bridge: EffectBridge.Shape, timeout?: number) {
|
||||
client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
|
||||
log.info("tools list changed notification received", { server: name })
|
||||
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
||||
|
||||
const listed = await Effect.runPromise(defs(name, client, timeout).pipe(Effect.provide(EffectLogger.layer)))
|
||||
const listed = await bridge.promise(defs(name, client, timeout))
|
||||
if (!listed) return
|
||||
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
||||
|
||||
s.defs[name] = listed
|
||||
await Effect.runPromise(
|
||||
bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore, Effect.provide(EffectLogger.layer)),
|
||||
)
|
||||
await bridge.promise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore))
|
||||
})
|
||||
}
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("MCP.state")(function* () {
|
||||
const cfg = yield* cfgSvc.get()
|
||||
const bridge = yield* EffectBridge.make()
|
||||
const config = cfg.mcp ?? {}
|
||||
const s: State = {
|
||||
status: {},
|
||||
@@ -518,7 +517,7 @@ export namespace MCP {
|
||||
if (result.mcpClient) {
|
||||
s.clients[key] = result.mcpClient
|
||||
s.defs[key] = result.defs!
|
||||
watch(s, key, result.mcpClient, mcp.timeout)
|
||||
watch(s, key, result.mcpClient, bridge, mcp.timeout)
|
||||
}
|
||||
}),
|
||||
{ concurrency: "unbounded" },
|
||||
@@ -565,11 +564,12 @@ export namespace MCP {
|
||||
listed: MCPToolDef[],
|
||||
timeout?: number,
|
||||
) {
|
||||
const bridge = yield* EffectBridge.make()
|
||||
yield* closeClient(s, name)
|
||||
s.status[name] = { status: "connected" }
|
||||
s.clients[name] = client
|
||||
s.defs[name] = listed
|
||||
watch(s, name, client, timeout)
|
||||
watch(s, name, client, bridge, timeout)
|
||||
return s.status[name]
|
||||
})
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ import { Log } from "../util/log"
|
||||
import path from "path"
|
||||
import { readdir, rm } from "fs/promises"
|
||||
import { Filesystem } from "@/util/filesystem"
|
||||
import { Flock } from "@/util/flock"
|
||||
import { Flock } from "@opencode-ai/shared/util/flock"
|
||||
import { Arborist } from "@npmcli/arborist"
|
||||
|
||||
export namespace Npm {
|
||||
|
||||
@@ -18,7 +18,7 @@ import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
|
||||
import { PoeAuthPlugin } from "opencode-poe-auth"
|
||||
import { CloudflareAIGatewayAuthPlugin, CloudflareWorkersAuthPlugin } from "./cloudflare"
|
||||
import { Effect, Layer, Context, Stream } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { errorMessage } from "@/util/error"
|
||||
import { PluginLoader } from "./loader"
|
||||
@@ -90,14 +90,6 @@ export namespace Plugin {
|
||||
return result
|
||||
}
|
||||
|
||||
function publishPluginError(bus: Bus.Interface, message: string) {
|
||||
Effect.runFork(
|
||||
bus
|
||||
.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() })
|
||||
.pipe(Effect.provide(EffectLogger.layer)),
|
||||
)
|
||||
}
|
||||
|
||||
async function applyPlugin(load: PluginLoader.Loaded, input: PluginInput, hooks: Hooks[]) {
|
||||
const plugin = readV1Plugin(load.mod, load.spec, "server", "detect")
|
||||
if (plugin) {
|
||||
@@ -120,6 +112,11 @@ export namespace Plugin {
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("Plugin.state")(function* (ctx) {
|
||||
const hooks: Hooks[] = []
|
||||
const bridge = yield* EffectBridge.make()
|
||||
|
||||
function publishPluginError(message: string) {
|
||||
bridge.fork(bus.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() }))
|
||||
}
|
||||
|
||||
const { Server } = yield* Effect.promise(() => import("../server/server"))
|
||||
|
||||
@@ -187,24 +184,24 @@ export namespace Plugin {
|
||||
if (stage === "install") {
|
||||
const parsed = parsePluginSpecifier(spec)
|
||||
log.error("failed to install plugin", { pkg: parsed.pkg, version: parsed.version, error: message })
|
||||
publishPluginError(bus, `Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
|
||||
publishPluginError(`Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
|
||||
return
|
||||
}
|
||||
|
||||
if (stage === "compatibility") {
|
||||
log.warn("plugin incompatible", { path: spec, error: message })
|
||||
publishPluginError(bus, `Plugin ${spec} skipped: ${message}`)
|
||||
publishPluginError(`Plugin ${spec} skipped: ${message}`)
|
||||
return
|
||||
}
|
||||
|
||||
if (stage === "entry") {
|
||||
log.error("failed to resolve plugin server entry", { path: spec, error: message })
|
||||
publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
|
||||
publishPluginError(`Failed to load plugin ${spec}: ${message}`)
|
||||
return
|
||||
}
|
||||
|
||||
log.error("failed to load plugin", { path: spec, target: resolved?.entry, error: message })
|
||||
publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
|
||||
publishPluginError(`Failed to load plugin ${spec}: ${message}`)
|
||||
},
|
||||
},
|
||||
}),
|
||||
|
||||
@@ -10,7 +10,7 @@ import {
|
||||
import { ConfigPaths } from "@/config/paths"
|
||||
import { Global } from "@/global"
|
||||
import { Filesystem } from "@/util/filesystem"
|
||||
import { Flock } from "@/util/flock"
|
||||
import { Flock } from "@opencode-ai/shared/util/flock"
|
||||
import { isRecord } from "@/util/record"
|
||||
|
||||
import { parsePluginSpecifier, readPackageThemes, readPluginPackage, resolvePluginTarget } from "./shared"
|
||||
|
||||
@@ -4,7 +4,7 @@ import { fileURLToPath } from "url"
|
||||
import { Flag } from "@/flag/flag"
|
||||
import { Global } from "@/global"
|
||||
import { Filesystem } from "@/util/filesystem"
|
||||
import { Flock } from "@/util/flock"
|
||||
import { Flock } from "@opencode-ai/shared/util/flock"
|
||||
|
||||
import { parsePluginSpecifier, pluginSource } from "./shared"
|
||||
|
||||
|
||||
@@ -6,8 +6,8 @@ import { Installation } from "../installation"
|
||||
import { Flag } from "../flag/flag"
|
||||
import { lazy } from "@/util/lazy"
|
||||
import { Filesystem } from "../util/filesystem"
|
||||
import { Flock } from "@/util/flock"
|
||||
import { Hash } from "@/util/hash"
|
||||
import { Flock } from "@opencode-ai/shared/util/flock"
|
||||
import { Hash } from "@opencode-ai/shared/util/hash"
|
||||
|
||||
// Try to import bundled snapshot (generated at build time)
|
||||
// Falls back to undefined in dev mode when snapshot doesn't exist
|
||||
|
||||
@@ -6,7 +6,7 @@ import { mapValues, mergeDeep, omit, pickBy, sortBy } from "remeda"
|
||||
import { NoSuchModelError, type Provider as SDK } from "ai"
|
||||
import { Log } from "../util/log"
|
||||
import { Npm } from "../npm"
|
||||
import { Hash } from "../util/hash"
|
||||
import { Hash } from "@opencode-ai/shared/util/hash"
|
||||
import { Plugin } from "../plugin"
|
||||
import { NamedError } from "@opencode-ai/shared/util/error"
|
||||
import { type LanguageModelV3 } from "@ai-sdk/provider"
|
||||
@@ -19,7 +19,7 @@ import { iife } from "@/util/iife"
|
||||
import { Global } from "../global"
|
||||
import path from "path"
|
||||
import { Effect, Layer, Context } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
|
||||
import { isRecord } from "@/util/record"
|
||||
@@ -1043,6 +1043,7 @@ export namespace Provider {
|
||||
const state = yield* InstanceState.make<State>(() =>
|
||||
Effect.gen(function* () {
|
||||
using _ = log.time("state")
|
||||
const bridge = yield* EffectBridge.make()
|
||||
const cfg = yield* config.get()
|
||||
const modelsDev = yield* Effect.promise(() => ModelsDev.get())
|
||||
const database = mapValues(modelsDev, fromModelsDevProvider)
|
||||
@@ -1223,8 +1224,7 @@ export namespace Provider {
|
||||
|
||||
const options = yield* Effect.promise(() =>
|
||||
plugin.auth!.loader!(
|
||||
() =>
|
||||
Effect.runPromise(auth.get(providerID).pipe(Effect.orDie, Effect.provide(EffectLogger.layer))) as any,
|
||||
() => bridge.promise(auth.get(providerID).pipe(Effect.orDie)) as any,
|
||||
database[plugin.auth!.provider],
|
||||
),
|
||||
)
|
||||
|
||||
@@ -10,7 +10,7 @@ import { Shell } from "@/shell/shell"
|
||||
import { Plugin } from "@/plugin"
|
||||
import { PtyID } from "./schema"
|
||||
import { Effect, Layer, Context } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
|
||||
export namespace Pty {
|
||||
const log = Log.create({ service: "pty" })
|
||||
@@ -173,6 +173,7 @@ export namespace Pty {
|
||||
|
||||
const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
|
||||
const s = yield* InstanceState.get(state)
|
||||
const bridge = yield* EffectBridge.make()
|
||||
const id = PtyID.ascending()
|
||||
const command = input.command || Shell.preferred()
|
||||
const args = input.args || []
|
||||
@@ -256,8 +257,8 @@ export namespace Pty {
|
||||
if (session.info.status === "exited") return
|
||||
log.info("session exited", { id, exitCode })
|
||||
session.info.status = "exited"
|
||||
Effect.runFork(bus.publish(Event.Exited, { id, exitCode }).pipe(Effect.provide(EffectLogger.layer)))
|
||||
Effect.runFork(remove(id).pipe(Effect.provide(EffectLogger.layer)))
|
||||
bridge.fork(bus.publish(Event.Exited, { id, exitCode }))
|
||||
bridge.fork(remove(id))
|
||||
}),
|
||||
)
|
||||
yield* bus.publish(Event.Created, { info })
|
||||
|
||||
@@ -5,10 +5,12 @@ import { Config } from "../../config/config"
|
||||
import { Provider } from "../../provider/provider"
|
||||
import { mapValues } from "remeda"
|
||||
import { errors } from "../error"
|
||||
import { Log } from "../../util/log"
|
||||
import { lazy } from "../../util/lazy"
|
||||
import { AppRuntime } from "../../effect/app-runtime"
|
||||
import { Effect } from "effect"
|
||||
import { jsonRequest } from "./trace"
|
||||
|
||||
const log = Log.create({ service: "server" })
|
||||
|
||||
export const ConfigRoutes = lazy(() =>
|
||||
new Hono()
|
||||
@@ -29,11 +31,9 @@ export const ConfigRoutes = lazy(() =>
|
||||
},
|
||||
},
|
||||
}),
|
||||
async (c) =>
|
||||
jsonRequest("ConfigRoutes.get", c, function* () {
|
||||
const cfg = yield* Config.Service
|
||||
return yield* cfg.get()
|
||||
}),
|
||||
async (c) => {
|
||||
return c.json(await AppRuntime.runPromise(Config.Service.use((cfg) => cfg.get())))
|
||||
},
|
||||
)
|
||||
.patch(
|
||||
"/",
|
||||
@@ -82,14 +82,18 @@ export const ConfigRoutes = lazy(() =>
|
||||
},
|
||||
},
|
||||
}),
|
||||
async (c) =>
|
||||
jsonRequest("ConfigRoutes.providers", c, function* () {
|
||||
const svc = yield* Provider.Service
|
||||
const providers = mapValues(yield* svc.list(), (item) => item)
|
||||
return {
|
||||
providers: Object.values(providers),
|
||||
default: mapValues(providers, (item) => Provider.sort(Object.values(item.models))[0].id),
|
||||
}
|
||||
}),
|
||||
async (c) => {
|
||||
using _ = log.time("providers")
|
||||
const providers = await AppRuntime.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const svc = yield* Provider.Service
|
||||
return mapValues(yield* svc.list(), (item) => item)
|
||||
}),
|
||||
)
|
||||
return c.json({
|
||||
providers: Object.values(providers),
|
||||
default: mapValues(providers, (item) => Provider.sort(Object.values(item.models))[0].id),
|
||||
})
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
@@ -26,7 +26,6 @@ import { errors } from "../error"
|
||||
import { lazy } from "../../util/lazy"
|
||||
import { Bus } from "../../bus"
|
||||
import { NamedError } from "@opencode-ai/shared/util/error"
|
||||
import { jsonRequest } from "./trace"
|
||||
|
||||
const log = Log.create({ service: "server" })
|
||||
|
||||
@@ -95,11 +94,10 @@ export const SessionRoutes = lazy(() =>
|
||||
...errors(400),
|
||||
},
|
||||
}),
|
||||
async (c) =>
|
||||
jsonRequest("SessionRoutes.status", c, function* () {
|
||||
const svc = yield* SessionStatus.Service
|
||||
return Object.fromEntries(yield* svc.list())
|
||||
}),
|
||||
async (c) => {
|
||||
const result = await AppRuntime.runPromise(SessionStatus.Service.use((svc) => svc.list()))
|
||||
return c.json(Object.fromEntries(result))
|
||||
},
|
||||
)
|
||||
.get(
|
||||
"/:sessionID",
|
||||
@@ -128,10 +126,8 @@ export const SessionRoutes = lazy(() =>
|
||||
),
|
||||
async (c) => {
|
||||
const sessionID = c.req.valid("param").sessionID
|
||||
return jsonRequest("SessionRoutes.get", c, function* () {
|
||||
const session = yield* Session.Service
|
||||
return yield* session.get(sessionID)
|
||||
})
|
||||
const session = await AppRuntime.runPromise(Session.Service.use((svc) => svc.get(sessionID)))
|
||||
return c.json(session)
|
||||
},
|
||||
)
|
||||
.get(
|
||||
@@ -161,10 +157,8 @@ export const SessionRoutes = lazy(() =>
|
||||
),
|
||||
async (c) => {
|
||||
const sessionID = c.req.valid("param").sessionID
|
||||
return jsonRequest("SessionRoutes.children", c, function* () {
|
||||
const session = yield* Session.Service
|
||||
return yield* session.children(sessionID)
|
||||
})
|
||||
const session = await AppRuntime.runPromise(Session.Service.use((svc) => svc.children(sessionID)))
|
||||
return c.json(session)
|
||||
},
|
||||
)
|
||||
.get(
|
||||
@@ -193,10 +187,8 @@ export const SessionRoutes = lazy(() =>
|
||||
),
|
||||
async (c) => {
|
||||
const sessionID = c.req.valid("param").sessionID
|
||||
return jsonRequest("SessionRoutes.todo", c, function* () {
|
||||
const todo = yield* Todo.Service
|
||||
return yield* todo.get(sessionID)
|
||||
})
|
||||
const todos = await AppRuntime.runPromise(Todo.Service.use((svc) => svc.get(sessionID)))
|
||||
return c.json(todos)
|
||||
},
|
||||
)
|
||||
.post(
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
import type { Context } from "hono"
|
||||
import { Effect } from "effect"
|
||||
import { AppRuntime } from "../../effect/app-runtime"
|
||||
|
||||
export function runRequest<A, E>(name: string, c: Context, effect: Effect.Effect<A, E, any>) {
|
||||
const url = new URL(c.req.url)
|
||||
return AppRuntime.runPromise(
|
||||
effect.pipe(
|
||||
Effect.withSpan(name, {
|
||||
attributes: {
|
||||
"http.method": c.req.method,
|
||||
"http.path": url.pathname,
|
||||
},
|
||||
}),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
export async function jsonRequest<C extends Context, A, E>(
|
||||
name: string,
|
||||
c: C,
|
||||
effect: (c: C) => Effect.gen.Return<A, E, any>,
|
||||
) {
|
||||
return c.json(
|
||||
await runRequest(
|
||||
name,
|
||||
c,
|
||||
Effect.gen(() => effect(c)),
|
||||
),
|
||||
)
|
||||
}
|
||||
@@ -20,13 +20,12 @@ import { Wildcard } from "@/util/wildcard"
|
||||
import { SessionID } from "@/session/schema"
|
||||
import { Auth } from "@/auth"
|
||||
import { Installation } from "@/installation"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import * as Option from "effect/Option"
|
||||
import * as OtelTracer from "@effect/opentelemetry/Tracer"
|
||||
|
||||
export namespace LLM {
|
||||
const log = Log.create({ service: "llm" })
|
||||
const perms = makeRuntime(Permission.Service, Permission.defaultLayer)
|
||||
export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX
|
||||
type Result = Awaited<ReturnType<typeof streamText>>
|
||||
|
||||
@@ -57,369 +56,371 @@ export namespace LLM {
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/LLM") {}
|
||||
|
||||
export const layer: Layer.Layer<Service, never, Auth.Service | Config.Service | Provider.Service | Plugin.Service> =
|
||||
Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const auth = yield* Auth.Service
|
||||
const config = yield* Config.Service
|
||||
const provider = yield* Provider.Service
|
||||
const plugin = yield* Plugin.Service
|
||||
const live: Layer.Layer<
|
||||
Service,
|
||||
never,
|
||||
Auth.Service | Config.Service | Provider.Service | Plugin.Service | Permission.Service
|
||||
> = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const auth = yield* Auth.Service
|
||||
const config = yield* Config.Service
|
||||
const provider = yield* Provider.Service
|
||||
const plugin = yield* Plugin.Service
|
||||
const perm = yield* Permission.Service
|
||||
|
||||
const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
|
||||
const l = log
|
||||
.clone()
|
||||
.tag("providerID", input.model.providerID)
|
||||
.tag("modelID", input.model.id)
|
||||
.tag("sessionID", input.sessionID)
|
||||
.tag("small", (input.small ?? false).toString())
|
||||
.tag("agent", input.agent.name)
|
||||
.tag("mode", input.agent.mode)
|
||||
l.info("stream", {
|
||||
modelID: input.model.id,
|
||||
providerID: input.model.providerID,
|
||||
})
|
||||
|
||||
const [language, cfg, item, info] = yield* Effect.all(
|
||||
[
|
||||
provider.getLanguage(input.model),
|
||||
config.get(),
|
||||
provider.getProvider(input.model.providerID),
|
||||
auth.get(input.model.providerID),
|
||||
],
|
||||
{ concurrency: "unbounded" },
|
||||
)
|
||||
|
||||
// TODO: move this to a proper hook
|
||||
const isOpenaiOauth = item.id === "openai" && info?.type === "oauth"
|
||||
|
||||
const system: string[] = []
|
||||
system.push(
|
||||
[
|
||||
// use agent prompt otherwise provider prompt
|
||||
...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
|
||||
// any custom prompt passed into this call
|
||||
...input.system,
|
||||
// any custom prompt from last user message
|
||||
...(input.user.system ? [input.user.system] : []),
|
||||
]
|
||||
.filter((x) => x)
|
||||
.join("\n"),
|
||||
)
|
||||
|
||||
const header = system[0]
|
||||
yield* plugin.trigger(
|
||||
"experimental.chat.system.transform",
|
||||
{ sessionID: input.sessionID, model: input.model },
|
||||
{ system },
|
||||
)
|
||||
// rejoin to maintain 2-part structure for caching if header unchanged
|
||||
if (system.length > 2 && system[0] === header) {
|
||||
const rest = system.slice(1)
|
||||
system.length = 0
|
||||
system.push(header, rest.join("\n"))
|
||||
}
|
||||
|
||||
const variant =
|
||||
!input.small && input.model.variants && input.user.model.variant
|
||||
? input.model.variants[input.user.model.variant]
|
||||
: {}
|
||||
const base = input.small
|
||||
? ProviderTransform.smallOptions(input.model)
|
||||
: ProviderTransform.options({
|
||||
model: input.model,
|
||||
sessionID: input.sessionID,
|
||||
providerOptions: item.options,
|
||||
})
|
||||
const options: Record<string, any> = pipe(
|
||||
base,
|
||||
mergeDeep(input.model.options),
|
||||
mergeDeep(input.agent.options),
|
||||
mergeDeep(variant),
|
||||
)
|
||||
if (isOpenaiOauth) {
|
||||
options.instructions = system.join("\n")
|
||||
}
|
||||
|
||||
const isWorkflow = language instanceof GitLabWorkflowLanguageModel
|
||||
const messages = isOpenaiOauth
|
||||
? input.messages
|
||||
: isWorkflow
|
||||
? input.messages
|
||||
: [
|
||||
...system.map(
|
||||
(x): ModelMessage => ({
|
||||
role: "system",
|
||||
content: x,
|
||||
}),
|
||||
),
|
||||
...input.messages,
|
||||
]
|
||||
|
||||
const params = yield* plugin.trigger(
|
||||
"chat.params",
|
||||
{
|
||||
sessionID: input.sessionID,
|
||||
agent: input.agent.name,
|
||||
model: input.model,
|
||||
provider: item,
|
||||
message: input.user,
|
||||
},
|
||||
{
|
||||
temperature: input.model.capabilities.temperature
|
||||
? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
|
||||
: undefined,
|
||||
topP: input.agent.topP ?? ProviderTransform.topP(input.model),
|
||||
topK: ProviderTransform.topK(input.model),
|
||||
maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
|
||||
options,
|
||||
},
|
||||
)
|
||||
|
||||
const { headers } = yield* plugin.trigger(
|
||||
"chat.headers",
|
||||
{
|
||||
sessionID: input.sessionID,
|
||||
agent: input.agent.name,
|
||||
model: input.model,
|
||||
provider: item,
|
||||
message: input.user,
|
||||
},
|
||||
{
|
||||
headers: {},
|
||||
},
|
||||
)
|
||||
|
||||
const tools = resolveTools(input)
|
||||
|
||||
// LiteLLM and some Anthropic proxies require the tools parameter to be present
|
||||
// when message history contains tool calls, even if no tools are being used.
|
||||
// Add a dummy tool that is never called to satisfy this validation.
|
||||
// This is enabled for:
|
||||
// 1. Providers with "litellm" in their ID or API ID (auto-detected)
|
||||
// 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
|
||||
const isLiteLLMProxy =
|
||||
item.options?.["litellmProxy"] === true ||
|
||||
input.model.providerID.toLowerCase().includes("litellm") ||
|
||||
input.model.api.id.toLowerCase().includes("litellm")
|
||||
|
||||
// LiteLLM/Bedrock rejects requests where the message history contains tool
|
||||
// calls but no tools param is present. When there are no active tools (e.g.
|
||||
// during compaction), inject a stub tool to satisfy the validation requirement.
|
||||
// The stub description explicitly tells the model not to call it.
|
||||
if (
|
||||
(isLiteLLMProxy || input.model.providerID.includes("github-copilot")) &&
|
||||
Object.keys(tools).length === 0 &&
|
||||
hasToolCalls(input.messages)
|
||||
) {
|
||||
tools["_noop"] = tool({
|
||||
description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
|
||||
inputSchema: jsonSchema({
|
||||
type: "object",
|
||||
properties: {
|
||||
reason: { type: "string", description: "Unused" },
|
||||
},
|
||||
}),
|
||||
execute: async () => ({ output: "", title: "", metadata: {} }),
|
||||
})
|
||||
}
|
||||
|
||||
// Wire up toolExecutor for DWS workflow models so that tool calls
|
||||
// from the workflow service are executed via opencode's tool system
|
||||
// and results sent back over the WebSocket.
|
||||
if (language instanceof GitLabWorkflowLanguageModel) {
|
||||
const workflowModel = language as GitLabWorkflowLanguageModel & {
|
||||
sessionID?: string
|
||||
sessionPreapprovedTools?: string[]
|
||||
approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
|
||||
}
|
||||
workflowModel.sessionID = input.sessionID
|
||||
workflowModel.systemPrompt = system.join("\n")
|
||||
workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
|
||||
const t = tools[toolName]
|
||||
if (!t || !t.execute) {
|
||||
return { result: "", error: `Unknown tool: ${toolName}` }
|
||||
}
|
||||
try {
|
||||
const result = await t.execute!(JSON.parse(argsJson), {
|
||||
toolCallId: _requestID,
|
||||
messages: input.messages,
|
||||
abortSignal: input.abort,
|
||||
})
|
||||
const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
|
||||
return {
|
||||
result: output,
|
||||
metadata: typeof result === "object" ? result?.metadata : undefined,
|
||||
title: typeof result === "object" ? result?.title : undefined,
|
||||
}
|
||||
} catch (e: any) {
|
||||
return { result: "", error: e.message ?? String(e) }
|
||||
}
|
||||
}
|
||||
|
||||
const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
|
||||
workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
|
||||
const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
|
||||
return !match || match.action !== "ask"
|
||||
})
|
||||
|
||||
const approvedToolsForSession = new Set<string>()
|
||||
workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
|
||||
const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
|
||||
// Auto-approve tools that were already approved in this session
|
||||
// (prevents infinite approval loops for server-side MCP tools)
|
||||
if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
|
||||
return { approved: true }
|
||||
}
|
||||
|
||||
const id = PermissionID.ascending()
|
||||
let reply: Permission.Reply | undefined
|
||||
let unsub: (() => void) | undefined
|
||||
try {
|
||||
unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
|
||||
if (evt.properties.requestID === id) reply = evt.properties.reply
|
||||
})
|
||||
const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
|
||||
try {
|
||||
const parsed = JSON.parse(t.args) as Record<string, unknown>
|
||||
const title = (parsed?.title ?? parsed?.name ?? "") as string
|
||||
return title ? `${t.name}: ${title}` : t.name
|
||||
} catch {
|
||||
return t.name
|
||||
}
|
||||
})
|
||||
const uniquePatterns = [...new Set(toolPatterns)] as string[]
|
||||
await perms.runPromise((svc) =>
|
||||
svc.ask({
|
||||
id,
|
||||
sessionID: SessionID.make(input.sessionID),
|
||||
permission: "workflow_tool_approval",
|
||||
patterns: uniquePatterns,
|
||||
metadata: { tools: approvalTools },
|
||||
always: uniquePatterns,
|
||||
ruleset: [],
|
||||
}),
|
||||
)
|
||||
for (const name of uniqueNames) approvedToolsForSession.add(name)
|
||||
workflowModel.sessionPreapprovedTools = [
|
||||
...(workflowModel.sessionPreapprovedTools ?? []),
|
||||
...uniqueNames,
|
||||
]
|
||||
return { approved: true }
|
||||
} catch {
|
||||
return { approved: false }
|
||||
} finally {
|
||||
unsub?.()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
const tracer = cfg.experimental?.openTelemetry
|
||||
? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer))
|
||||
: undefined
|
||||
|
||||
return streamText({
|
||||
onError(error) {
|
||||
l.error("stream error", {
|
||||
error,
|
||||
})
|
||||
},
|
||||
async experimental_repairToolCall(failed) {
|
||||
const lower = failed.toolCall.toolName.toLowerCase()
|
||||
if (lower !== failed.toolCall.toolName && tools[lower]) {
|
||||
l.info("repairing tool call", {
|
||||
tool: failed.toolCall.toolName,
|
||||
repaired: lower,
|
||||
})
|
||||
return {
|
||||
...failed.toolCall,
|
||||
toolName: lower,
|
||||
}
|
||||
}
|
||||
return {
|
||||
...failed.toolCall,
|
||||
input: JSON.stringify({
|
||||
tool: failed.toolCall.toolName,
|
||||
error: failed.error.message,
|
||||
}),
|
||||
toolName: "invalid",
|
||||
}
|
||||
},
|
||||
temperature: params.temperature,
|
||||
topP: params.topP,
|
||||
topK: params.topK,
|
||||
providerOptions: ProviderTransform.providerOptions(input.model, params.options),
|
||||
activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
|
||||
tools,
|
||||
toolChoice: input.toolChoice,
|
||||
maxOutputTokens: params.maxOutputTokens,
|
||||
abortSignal: input.abort,
|
||||
headers: {
|
||||
...(input.model.providerID.startsWith("opencode")
|
||||
? {
|
||||
"x-opencode-project": Instance.project.id,
|
||||
"x-opencode-session": input.sessionID,
|
||||
"x-opencode-request": input.user.id,
|
||||
"x-opencode-client": Flag.OPENCODE_CLIENT,
|
||||
}
|
||||
: {
|
||||
"x-session-affinity": input.sessionID,
|
||||
...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
|
||||
"User-Agent": `opencode/${Installation.VERSION}`,
|
||||
}),
|
||||
...input.model.headers,
|
||||
...headers,
|
||||
},
|
||||
maxRetries: input.retries ?? 0,
|
||||
messages,
|
||||
model: wrapLanguageModel({
|
||||
model: language,
|
||||
middleware: [
|
||||
{
|
||||
specificationVersion: "v3" as const,
|
||||
async transformParams(args) {
|
||||
if (args.type === "stream") {
|
||||
// @ts-expect-error
|
||||
args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
|
||||
}
|
||||
return args.params
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
experimental_telemetry: {
|
||||
isEnabled: cfg.experimental?.openTelemetry,
|
||||
functionId: "session.llm",
|
||||
tracer,
|
||||
metadata: {
|
||||
userId: cfg.username ?? "unknown",
|
||||
sessionId: input.sessionID,
|
||||
},
|
||||
},
|
||||
})
|
||||
const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
|
||||
const l = log
|
||||
.clone()
|
||||
.tag("providerID", input.model.providerID)
|
||||
.tag("modelID", input.model.id)
|
||||
.tag("sessionID", input.sessionID)
|
||||
.tag("small", (input.small ?? false).toString())
|
||||
.tag("agent", input.agent.name)
|
||||
.tag("mode", input.agent.mode)
|
||||
l.info("stream", {
|
||||
modelID: input.model.id,
|
||||
providerID: input.model.providerID,
|
||||
})
|
||||
|
||||
const stream: Interface["stream"] = (input) =>
|
||||
Stream.scoped(
|
||||
Stream.unwrap(
|
||||
Effect.gen(function* () {
|
||||
const ctrl = yield* Effect.acquireRelease(
|
||||
Effect.sync(() => new AbortController()),
|
||||
(ctrl) => Effect.sync(() => ctrl.abort()),
|
||||
)
|
||||
const [language, cfg, item, info] = yield* Effect.all(
|
||||
[
|
||||
provider.getLanguage(input.model),
|
||||
config.get(),
|
||||
provider.getProvider(input.model.providerID),
|
||||
auth.get(input.model.providerID),
|
||||
],
|
||||
{ concurrency: "unbounded" },
|
||||
)
|
||||
|
||||
const result = yield* run({ ...input, abort: ctrl.signal })
|
||||
// TODO: move this to a proper hook
|
||||
const isOpenaiOauth = item.id === "openai" && info?.type === "oauth"
|
||||
|
||||
return Stream.fromAsyncIterable(result.fullStream, (e) =>
|
||||
e instanceof Error ? e : new Error(String(e)),
|
||||
)
|
||||
const system: string[] = []
|
||||
system.push(
|
||||
[
|
||||
// use agent prompt otherwise provider prompt
|
||||
...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
|
||||
// any custom prompt passed into this call
|
||||
...input.system,
|
||||
// any custom prompt from last user message
|
||||
...(input.user.system ? [input.user.system] : []),
|
||||
]
|
||||
.filter((x) => x)
|
||||
.join("\n"),
|
||||
)
|
||||
|
||||
const header = system[0]
|
||||
yield* plugin.trigger(
|
||||
"experimental.chat.system.transform",
|
||||
{ sessionID: input.sessionID, model: input.model },
|
||||
{ system },
|
||||
)
|
||||
// rejoin to maintain 2-part structure for caching if header unchanged
|
||||
if (system.length > 2 && system[0] === header) {
|
||||
const rest = system.slice(1)
|
||||
system.length = 0
|
||||
system.push(header, rest.join("\n"))
|
||||
}
|
||||
|
||||
const variant =
|
||||
!input.small && input.model.variants && input.user.model.variant
|
||||
? input.model.variants[input.user.model.variant]
|
||||
: {}
|
||||
const base = input.small
|
||||
? ProviderTransform.smallOptions(input.model)
|
||||
: ProviderTransform.options({
|
||||
model: input.model,
|
||||
sessionID: input.sessionID,
|
||||
providerOptions: item.options,
|
||||
})
|
||||
const options: Record<string, any> = pipe(
|
||||
base,
|
||||
mergeDeep(input.model.options),
|
||||
mergeDeep(input.agent.options),
|
||||
mergeDeep(variant),
|
||||
)
|
||||
if (isOpenaiOauth) {
|
||||
options.instructions = system.join("\n")
|
||||
}
|
||||
|
||||
const isWorkflow = language instanceof GitLabWorkflowLanguageModel
|
||||
const messages = isOpenaiOauth
|
||||
? input.messages
|
||||
: isWorkflow
|
||||
? input.messages
|
||||
: [
|
||||
...system.map(
|
||||
(x): ModelMessage => ({
|
||||
role: "system",
|
||||
content: x,
|
||||
}),
|
||||
),
|
||||
...input.messages,
|
||||
]
|
||||
|
||||
const params = yield* plugin.trigger(
|
||||
"chat.params",
|
||||
{
|
||||
sessionID: input.sessionID,
|
||||
agent: input.agent.name,
|
||||
model: input.model,
|
||||
provider: item,
|
||||
message: input.user,
|
||||
},
|
||||
{
|
||||
temperature: input.model.capabilities.temperature
|
||||
? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
|
||||
: undefined,
|
||||
topP: input.agent.topP ?? ProviderTransform.topP(input.model),
|
||||
topK: ProviderTransform.topK(input.model),
|
||||
maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
|
||||
options,
|
||||
},
|
||||
)
|
||||
|
||||
const { headers } = yield* plugin.trigger(
|
||||
"chat.headers",
|
||||
{
|
||||
sessionID: input.sessionID,
|
||||
agent: input.agent.name,
|
||||
model: input.model,
|
||||
provider: item,
|
||||
message: input.user,
|
||||
},
|
||||
{
|
||||
headers: {},
|
||||
},
|
||||
)
|
||||
|
||||
const tools = resolveTools(input)
|
||||
|
||||
// LiteLLM and some Anthropic proxies require the tools parameter to be present
|
||||
// when message history contains tool calls, even if no tools are being used.
|
||||
// Add a dummy tool that is never called to satisfy this validation.
|
||||
// This is enabled for:
|
||||
// 1. Providers with "litellm" in their ID or API ID (auto-detected)
|
||||
// 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
|
||||
const isLiteLLMProxy =
|
||||
item.options?.["litellmProxy"] === true ||
|
||||
input.model.providerID.toLowerCase().includes("litellm") ||
|
||||
input.model.api.id.toLowerCase().includes("litellm")
|
||||
|
||||
// LiteLLM/Bedrock rejects requests where the message history contains tool
|
||||
// calls but no tools param is present. When there are no active tools (e.g.
|
||||
// during compaction), inject a stub tool to satisfy the validation requirement.
|
||||
// The stub description explicitly tells the model not to call it.
|
||||
if (
|
||||
(isLiteLLMProxy || input.model.providerID.includes("github-copilot")) &&
|
||||
Object.keys(tools).length === 0 &&
|
||||
hasToolCalls(input.messages)
|
||||
) {
|
||||
tools["_noop"] = tool({
|
||||
description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
|
||||
inputSchema: jsonSchema({
|
||||
type: "object",
|
||||
properties: {
|
||||
reason: { type: "string", description: "Unused" },
|
||||
},
|
||||
}),
|
||||
execute: async () => ({ output: "", title: "", metadata: {} }),
|
||||
})
|
||||
}
|
||||
|
||||
// Wire up toolExecutor for DWS workflow models so that tool calls
|
||||
// from the workflow service are executed via opencode's tool system
|
||||
// and results sent back over the WebSocket.
|
||||
if (language instanceof GitLabWorkflowLanguageModel) {
|
||||
const workflowModel = language as GitLabWorkflowLanguageModel & {
|
||||
sessionID?: string
|
||||
sessionPreapprovedTools?: string[]
|
||||
approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
|
||||
}
|
||||
workflowModel.sessionID = input.sessionID
|
||||
workflowModel.systemPrompt = system.join("\n")
|
||||
workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
|
||||
const t = tools[toolName]
|
||||
if (!t || !t.execute) {
|
||||
return { result: "", error: `Unknown tool: ${toolName}` }
|
||||
}
|
||||
try {
|
||||
const result = await t.execute!(JSON.parse(argsJson), {
|
||||
toolCallId: _requestID,
|
||||
messages: input.messages,
|
||||
abortSignal: input.abort,
|
||||
})
|
||||
const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
|
||||
return {
|
||||
result: output,
|
||||
metadata: typeof result === "object" ? result?.metadata : undefined,
|
||||
title: typeof result === "object" ? result?.title : undefined,
|
||||
}
|
||||
} catch (e: any) {
|
||||
return { result: "", error: e.message ?? String(e) }
|
||||
}
|
||||
}
|
||||
|
||||
const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
|
||||
workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
|
||||
const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
|
||||
return !match || match.action !== "ask"
|
||||
})
|
||||
|
||||
const bridge = yield* EffectBridge.make()
|
||||
const approvedToolsForSession = new Set<string>()
|
||||
workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
|
||||
const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
|
||||
// Auto-approve tools that were already approved in this session
|
||||
// (prevents infinite approval loops for server-side MCP tools)
|
||||
if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
|
||||
return { approved: true }
|
||||
}
|
||||
|
||||
const id = PermissionID.ascending()
|
||||
let reply: Permission.Reply | undefined
|
||||
let unsub: (() => void) | undefined
|
||||
try {
|
||||
unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
|
||||
if (evt.properties.requestID === id) reply = evt.properties.reply
|
||||
})
|
||||
const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
|
||||
try {
|
||||
const parsed = JSON.parse(t.args) as Record<string, unknown>
|
||||
const title = (parsed?.title ?? parsed?.name ?? "") as string
|
||||
return title ? `${t.name}: ${title}` : t.name
|
||||
} catch {
|
||||
return t.name
|
||||
}
|
||||
})
|
||||
const uniquePatterns = [...new Set(toolPatterns)] as string[]
|
||||
await bridge.promise(
|
||||
perm.ask({
|
||||
id,
|
||||
sessionID: SessionID.make(input.sessionID),
|
||||
permission: "workflow_tool_approval",
|
||||
patterns: uniquePatterns,
|
||||
metadata: { tools: approvalTools },
|
||||
always: uniquePatterns,
|
||||
ruleset: [],
|
||||
}),
|
||||
)
|
||||
for (const name of uniqueNames) approvedToolsForSession.add(name)
|
||||
workflowModel.sessionPreapprovedTools = [...(workflowModel.sessionPreapprovedTools ?? []), ...uniqueNames]
|
||||
return { approved: true }
|
||||
} catch {
|
||||
return { approved: false }
|
||||
} finally {
|
||||
unsub?.()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
const tracer = cfg.experimental?.openTelemetry
|
||||
? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer))
|
||||
: undefined
|
||||
|
||||
return streamText({
|
||||
onError(error) {
|
||||
l.error("stream error", {
|
||||
error,
|
||||
})
|
||||
},
|
||||
async experimental_repairToolCall(failed) {
|
||||
const lower = failed.toolCall.toolName.toLowerCase()
|
||||
if (lower !== failed.toolCall.toolName && tools[lower]) {
|
||||
l.info("repairing tool call", {
|
||||
tool: failed.toolCall.toolName,
|
||||
repaired: lower,
|
||||
})
|
||||
return {
|
||||
...failed.toolCall,
|
||||
toolName: lower,
|
||||
}
|
||||
}
|
||||
return {
|
||||
...failed.toolCall,
|
||||
input: JSON.stringify({
|
||||
tool: failed.toolCall.toolName,
|
||||
error: failed.error.message,
|
||||
}),
|
||||
),
|
||||
)
|
||||
toolName: "invalid",
|
||||
}
|
||||
},
|
||||
temperature: params.temperature,
|
||||
topP: params.topP,
|
||||
topK: params.topK,
|
||||
providerOptions: ProviderTransform.providerOptions(input.model, params.options),
|
||||
activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
|
||||
tools,
|
||||
toolChoice: input.toolChoice,
|
||||
maxOutputTokens: params.maxOutputTokens,
|
||||
abortSignal: input.abort,
|
||||
headers: {
|
||||
...(input.model.providerID.startsWith("opencode")
|
||||
? {
|
||||
"x-opencode-project": Instance.project.id,
|
||||
"x-opencode-session": input.sessionID,
|
||||
"x-opencode-request": input.user.id,
|
||||
"x-opencode-client": Flag.OPENCODE_CLIENT,
|
||||
}
|
||||
: {
|
||||
"x-session-affinity": input.sessionID,
|
||||
...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
|
||||
"User-Agent": `opencode/${Installation.VERSION}`,
|
||||
}),
|
||||
...input.model.headers,
|
||||
...headers,
|
||||
},
|
||||
maxRetries: input.retries ?? 0,
|
||||
messages,
|
||||
model: wrapLanguageModel({
|
||||
model: language,
|
||||
middleware: [
|
||||
{
|
||||
specificationVersion: "v3" as const,
|
||||
async transformParams(args) {
|
||||
if (args.type === "stream") {
|
||||
// @ts-expect-error
|
||||
args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
|
||||
}
|
||||
return args.params
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
experimental_telemetry: {
|
||||
isEnabled: cfg.experimental?.openTelemetry,
|
||||
functionId: "session.llm",
|
||||
tracer,
|
||||
metadata: {
|
||||
userId: cfg.username ?? "unknown",
|
||||
sessionId: input.sessionID,
|
||||
},
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
return Service.of({ stream })
|
||||
}),
|
||||
)
|
||||
const stream: Interface["stream"] = (input) =>
|
||||
Stream.scoped(
|
||||
Stream.unwrap(
|
||||
Effect.gen(function* () {
|
||||
const ctrl = yield* Effect.acquireRelease(
|
||||
Effect.sync(() => new AbortController()),
|
||||
(ctrl) => Effect.sync(() => ctrl.abort()),
|
||||
)
|
||||
|
||||
const result = yield* run({ ...input, abort: ctrl.signal })
|
||||
|
||||
return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e))))
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
return Service.of({ stream })
|
||||
}),
|
||||
)
|
||||
|
||||
export const layer = live.pipe(Layer.provide(Permission.defaultLayer))
|
||||
|
||||
export const defaultLayer = Layer.suspend(() =>
|
||||
layer.pipe(
|
||||
|
||||
@@ -48,6 +48,7 @@ import { EffectLogger } from "@/effect/logger"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { TaskTool, type TaskPromptOps } from "@/tool/task"
|
||||
import { SessionRunState } from "./run-state"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
|
||||
// @ts-ignore
|
||||
globalThis.AI_SDK_LOG_WARNINGS = false
|
||||
@@ -105,11 +106,7 @@ export namespace SessionPrompt {
|
||||
const sys = yield* SystemPrompt.Service
|
||||
const llm = yield* LLM.Service
|
||||
const runner = Effect.fn("SessionPrompt.runner")(function* () {
|
||||
const ctx = yield* Effect.context()
|
||||
return {
|
||||
promise: <A, E>(effect: Effect.Effect<A, E>) => Effect.runPromiseWith(ctx)(effect),
|
||||
fork: <A, E>(effect: Effect.Effect<A, E>) => Effect.runForkWith(ctx)(effect),
|
||||
}
|
||||
return yield* EffectBridge.make()
|
||||
})
|
||||
const ops = Effect.fn("SessionPrompt.ops")(function* () {
|
||||
const run = yield* runner()
|
||||
|
||||
@@ -6,7 +6,7 @@ import z from "zod"
|
||||
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
|
||||
import { Hash } from "@/util/hash"
|
||||
import { Hash } from "@opencode-ai/shared/util/hash"
|
||||
import { Config } from "../config/config"
|
||||
import { Global } from "../global"
|
||||
import { Log } from "../util/log"
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { expect, test } from "bun:test"
|
||||
import { Context, Effect, Layer, Logger } from "effect"
|
||||
import { AppRuntime } from "../../src/effect/app-runtime"
|
||||
import { EffectBridge } from "../../src/effect/bridge"
|
||||
import { InstanceRef } from "../../src/effect/instance-ref"
|
||||
import { EffectLogger } from "../../src/effect/logger"
|
||||
import { makeRuntime } from "../../src/effect/run-service"
|
||||
@@ -59,3 +60,33 @@ test("AppRuntime attaches InstanceRef from ALS", async () => {
|
||||
|
||||
expect(dir).toBe(tmp.path)
|
||||
})
|
||||
|
||||
test("EffectBridge preserves logger and instance context across async boundaries", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
|
||||
const result = await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: () =>
|
||||
AppRuntime.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const bridge = yield* EffectBridge.make()
|
||||
return yield* Effect.promise(() =>
|
||||
Promise.resolve().then(() =>
|
||||
bridge.promise(
|
||||
Effect.gen(function* () {
|
||||
return {
|
||||
directory: (yield* InstanceRef)?.directory,
|
||||
...check(yield* Effect.service(Logger.CurrentLoggers)),
|
||||
}
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
}),
|
||||
),
|
||||
})
|
||||
|
||||
expect(result.directory).toBe(tmp.path)
|
||||
expect(result.effectLogger).toBe(true)
|
||||
expect(result.defaultLogger).toBe(false)
|
||||
})
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import fs from "fs/promises"
|
||||
import { Flock } from "../../src/util/flock"
|
||||
import { Flock } from "@opencode-ai/shared/util/flock"
|
||||
|
||||
type Msg = {
|
||||
key: string
|
||||
|
||||
@@ -17,10 +17,11 @@
|
||||
"dist"
|
||||
],
|
||||
"scripts": {
|
||||
"typecheck": "tsc --noEmit",
|
||||
"typecheck": "tsgo --noEmit",
|
||||
"build": "tsc"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@typescript/native-preview": "catalog:",
|
||||
"typescript": "catalog:"
|
||||
},
|
||||
"dependencies": {
|
||||
|
||||
@@ -5,7 +5,9 @@
|
||||
"type": "module",
|
||||
"license": "MIT",
|
||||
"private": true,
|
||||
"scripts": {},
|
||||
"scripts": {
|
||||
"test": "bun test"
|
||||
},
|
||||
"bin": {
|
||||
"opencode": "./bin/opencode"
|
||||
},
|
||||
@@ -14,7 +16,8 @@
|
||||
},
|
||||
"imports": {},
|
||||
"devDependencies": {
|
||||
"@types/semver": "catalog:"
|
||||
"@types/semver": "catalog:",
|
||||
"@types/bun": "catalog:"
|
||||
},
|
||||
"dependencies": {
|
||||
"@effect/platform-node": "catalog:",
|
||||
@@ -23,6 +26,7 @@
|
||||
"mime-types": "3.0.2",
|
||||
"minimatch": "10.2.5",
|
||||
"semver": "catalog:",
|
||||
"xdg-basedir": "5.1.0",
|
||||
"zod": "catalog:"
|
||||
},
|
||||
"overrides": {
|
||||
|
||||
42
packages/shared/src/global.ts
Normal file
42
packages/shared/src/global.ts
Normal file
@@ -0,0 +1,42 @@
|
||||
import path from "path"
|
||||
import { xdgData, xdgCache, xdgConfig, xdgState } from "xdg-basedir"
|
||||
import os from "os"
|
||||
import { Context, Effect, Layer } from "effect"
|
||||
|
||||
export namespace Global {
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/Global") {}
|
||||
|
||||
export interface Interface {
|
||||
readonly home: string
|
||||
readonly data: string
|
||||
readonly cache: string
|
||||
readonly config: string
|
||||
readonly state: string
|
||||
readonly bin: string
|
||||
readonly log: string
|
||||
}
|
||||
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const app = "opencode"
|
||||
const home = process.env.OPENCODE_TEST_HOME ?? os.homedir()
|
||||
const data = path.join(xdgData!, app)
|
||||
const cache = path.join(xdgCache!, app)
|
||||
const cfg = path.join(xdgConfig!, app)
|
||||
const state = path.join(xdgState!, app)
|
||||
const bin = path.join(cache, "bin")
|
||||
const log = path.join(data, "log")
|
||||
|
||||
return Service.of({
|
||||
home,
|
||||
data,
|
||||
cache,
|
||||
config: cfg,
|
||||
state,
|
||||
bin,
|
||||
log,
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
247
packages/shared/src/npm.ts
Normal file
247
packages/shared/src/npm.ts
Normal file
@@ -0,0 +1,247 @@
|
||||
import path from "path"
|
||||
import semver from "semver"
|
||||
import { Arborist } from "@npmcli/arborist"
|
||||
import { Effect, Schema, Context, Layer, Option, FileSystem } from "effect"
|
||||
import { NodeFileSystem } from "@effect/platform-node"
|
||||
import { AppFileSystem } from "./filesystem"
|
||||
import { Global } from "./global"
|
||||
import { Flock } from "./util/flock"
|
||||
|
||||
export namespace Npm {
|
||||
export class InstallFailedError extends Schema.TaggedErrorClass<InstallFailedError>()("NpmInstallFailedError", {
|
||||
pkg: Schema.String,
|
||||
cause: Schema.optional(Schema.Defect),
|
||||
}) {}
|
||||
|
||||
export interface EntryPoint {
|
||||
readonly directory: string
|
||||
readonly entrypoint: Option.Option<string>
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
readonly add: (pkg: string) => Effect.Effect<EntryPoint, InstallFailedError>
|
||||
readonly install: (dir: string) => Effect.Effect<void>
|
||||
readonly outdated: (pkg: string, cachedVersion: string) => Effect.Effect<boolean>
|
||||
readonly which: (pkg: string) => Effect.Effect<Option.Option<string>>
|
||||
}
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/Npm") {}
|
||||
|
||||
const illegal = process.platform === "win32" ? new Set(["<", ">", ":", '"', "|", "?", "*"]) : undefined
|
||||
|
||||
export function sanitize(pkg: string) {
|
||||
if (!illegal) return pkg
|
||||
return Array.from(pkg, (char) => (illegal.has(char) || char.charCodeAt(0) < 32 ? "_" : char)).join("")
|
||||
}
|
||||
|
||||
const resolveEntryPoint = (name: string, dir: string): EntryPoint => {
|
||||
let entrypoint: Option.Option<string>
|
||||
try {
|
||||
const resolved = typeof Bun !== "undefined" ? import.meta.resolve(name, dir) : import.meta.resolve(dir)
|
||||
entrypoint = Option.some(resolved)
|
||||
} catch {
|
||||
entrypoint = Option.none()
|
||||
}
|
||||
return {
|
||||
directory: dir,
|
||||
entrypoint,
|
||||
}
|
||||
}
|
||||
|
||||
interface ArboristNode {
|
||||
name: string
|
||||
path: string
|
||||
}
|
||||
|
||||
interface ArboristTree {
|
||||
edgesOut: Map<string, { to?: ArboristNode }>
|
||||
}
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const afs = yield* AppFileSystem.Service
|
||||
const global = yield* Global.Service
|
||||
const fs = yield* FileSystem.FileSystem
|
||||
const directory = (pkg: string) => path.join(global.cache, "packages", sanitize(pkg))
|
||||
|
||||
const outdated = Effect.fn("Npm.outdated")(function* (pkg: string, cachedVersion: string) {
|
||||
const response = yield* Effect.tryPromise({
|
||||
try: () => fetch(`https://registry.npmjs.org/${pkg}`),
|
||||
catch: () => undefined,
|
||||
}).pipe(Effect.orElseSucceed(() => undefined))
|
||||
|
||||
if (!response || !response.ok) {
|
||||
return false
|
||||
}
|
||||
|
||||
const data = yield* Effect.tryPromise({
|
||||
try: () => response.json() as Promise<{ "dist-tags"?: { latest?: string } }>,
|
||||
catch: () => undefined,
|
||||
}).pipe(Effect.orElseSucceed(() => undefined))
|
||||
|
||||
const latestVersion = data?.["dist-tags"]?.latest
|
||||
if (!latestVersion) {
|
||||
return false
|
||||
}
|
||||
|
||||
const range = /[\s^~*xX<>|=]/.test(cachedVersion)
|
||||
if (range) return !semver.satisfies(latestVersion, cachedVersion)
|
||||
|
||||
return semver.lt(cachedVersion, latestVersion)
|
||||
})
|
||||
|
||||
const add = Effect.fn("Npm.add")(function* (pkg: string) {
|
||||
const dir = directory(pkg)
|
||||
yield* Flock.effect(`npm-install:${dir}`)
|
||||
|
||||
const arborist = new Arborist({
|
||||
path: dir,
|
||||
binLinks: true,
|
||||
progress: false,
|
||||
savePrefix: "",
|
||||
ignoreScripts: true,
|
||||
})
|
||||
|
||||
const tree = yield* Effect.tryPromise({
|
||||
try: () => arborist.loadVirtual().catch(() => undefined),
|
||||
catch: () => undefined,
|
||||
}).pipe(Effect.orElseSucceed(() => undefined)) as Effect.Effect<ArboristTree | undefined>
|
||||
|
||||
if (tree) {
|
||||
const first = tree.edgesOut.values().next().value?.to
|
||||
if (first) {
|
||||
return resolveEntryPoint(first.name, first.path)
|
||||
}
|
||||
}
|
||||
|
||||
const result = yield* Effect.tryPromise({
|
||||
try: () =>
|
||||
arborist.reify({
|
||||
add: [pkg],
|
||||
save: true,
|
||||
saveType: "prod",
|
||||
}),
|
||||
catch: (cause) => new InstallFailedError({ pkg, cause }),
|
||||
}) as Effect.Effect<ArboristTree, InstallFailedError>
|
||||
|
||||
const first = result.edgesOut.values().next().value?.to
|
||||
if (!first) {
|
||||
return yield* new InstallFailedError({ pkg })
|
||||
}
|
||||
|
||||
return resolveEntryPoint(first.name, first.path)
|
||||
}, Effect.scoped)
|
||||
|
||||
const install = Effect.fn("Npm.install")(function* (dir: string) {
|
||||
yield* Flock.effect(`npm-install:${dir}`)
|
||||
|
||||
const reify = Effect.fnUntraced(function* () {
|
||||
const arb = new Arborist({
|
||||
path: dir,
|
||||
binLinks: true,
|
||||
progress: false,
|
||||
savePrefix: "",
|
||||
ignoreScripts: true,
|
||||
})
|
||||
yield* Effect.tryPromise({
|
||||
try: () => arb.reify().catch(() => {}),
|
||||
catch: () => {},
|
||||
}).pipe(Effect.orElseSucceed(() => {}))
|
||||
})
|
||||
|
||||
const nodeModulesExists = yield* afs.existsSafe(path.join(dir, "node_modules"))
|
||||
if (!nodeModulesExists) {
|
||||
yield* reify()
|
||||
return
|
||||
}
|
||||
|
||||
const pkg = yield* afs.readJson(path.join(dir, "package.json")).pipe(Effect.orElseSucceed(() => ({})))
|
||||
const lock = yield* afs.readJson(path.join(dir, "package-lock.json")).pipe(Effect.orElseSucceed(() => ({})))
|
||||
|
||||
const pkgAny = pkg as any
|
||||
const lockAny = lock as any
|
||||
|
||||
const declared = new Set([
|
||||
...Object.keys(pkgAny?.dependencies || {}),
|
||||
...Object.keys(pkgAny?.devDependencies || {}),
|
||||
...Object.keys(pkgAny?.peerDependencies || {}),
|
||||
...Object.keys(pkgAny?.optionalDependencies || {}),
|
||||
])
|
||||
|
||||
const root = lockAny?.packages?.[""] || {}
|
||||
const locked = new Set([
|
||||
...Object.keys(root?.dependencies || {}),
|
||||
...Object.keys(root?.devDependencies || {}),
|
||||
...Object.keys(root?.peerDependencies || {}),
|
||||
...Object.keys(root?.optionalDependencies || {}),
|
||||
])
|
||||
|
||||
for (const name of declared) {
|
||||
if (!locked.has(name)) {
|
||||
yield* reify()
|
||||
return
|
||||
}
|
||||
}
|
||||
}, Effect.scoped)
|
||||
|
||||
const which = Effect.fn("Npm.which")(function* (pkg: string) {
|
||||
const dir = directory(pkg)
|
||||
const binDir = path.join(dir, "node_modules", ".bin")
|
||||
|
||||
const pick = Effect.fnUntraced(function* () {
|
||||
const files = yield* fs.readDirectory(binDir).pipe(Effect.catch(() => Effect.succeed([] as string[])))
|
||||
|
||||
if (files.length === 0) return Option.none<string>()
|
||||
if (files.length === 1) return Option.some(files[0])
|
||||
|
||||
const pkgJson = yield* afs.readJson(path.join(dir, "node_modules", pkg, "package.json")).pipe(Effect.option)
|
||||
|
||||
if (Option.isSome(pkgJson)) {
|
||||
const parsed = pkgJson.value as { bin?: string | Record<string, string> }
|
||||
if (parsed?.bin) {
|
||||
const unscoped = pkg.startsWith("@") ? pkg.split("/")[1] : pkg
|
||||
const bin = parsed.bin
|
||||
if (typeof bin === "string") return Option.some(unscoped)
|
||||
const keys = Object.keys(bin)
|
||||
if (keys.length === 1) return Option.some(keys[0])
|
||||
return bin[unscoped] ? Option.some(unscoped) : Option.some(keys[0])
|
||||
}
|
||||
}
|
||||
|
||||
return Option.some(files[0])
|
||||
})
|
||||
|
||||
return yield* Effect.gen(function* () {
|
||||
const bin = yield* pick()
|
||||
if (Option.isSome(bin)) {
|
||||
return Option.some(path.join(binDir, bin.value))
|
||||
}
|
||||
|
||||
yield* fs.remove(path.join(dir, "package-lock.json")).pipe(Effect.orElseSucceed(() => {}))
|
||||
|
||||
yield* add(pkg)
|
||||
|
||||
const resolved = yield* pick()
|
||||
if (Option.isNone(resolved)) return Option.none<string>()
|
||||
return Option.some(path.join(binDir, resolved.value))
|
||||
}).pipe(
|
||||
Effect.scoped,
|
||||
Effect.orElseSucceed(() => Option.none<string>()),
|
||||
)
|
||||
})
|
||||
|
||||
return Service.of({
|
||||
add,
|
||||
install,
|
||||
outdated,
|
||||
which,
|
||||
})
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(
|
||||
Layer.provide(AppFileSystem.layer),
|
||||
Layer.provide(Global.layer),
|
||||
Layer.provide(NodeFileSystem.layer),
|
||||
)
|
||||
}
|
||||
44
packages/shared/src/types.d.ts
vendored
Normal file
44
packages/shared/src/types.d.ts
vendored
Normal file
@@ -0,0 +1,44 @@
|
||||
declare module "@npmcli/arborist" {
|
||||
export interface ArboristOptions {
|
||||
path: string
|
||||
binLinks?: boolean
|
||||
progress?: boolean
|
||||
savePrefix?: string
|
||||
ignoreScripts?: boolean
|
||||
}
|
||||
|
||||
export interface ArboristNode {
|
||||
name: string
|
||||
path: string
|
||||
}
|
||||
|
||||
export interface ArboristEdge {
|
||||
to?: ArboristNode
|
||||
}
|
||||
|
||||
export interface ArboristTree {
|
||||
edgesOut: Map<string, ArboristEdge>
|
||||
}
|
||||
|
||||
export interface ReifyOptions {
|
||||
add?: string[]
|
||||
save?: boolean
|
||||
saveType?: "prod" | "dev" | "optional" | "peer"
|
||||
}
|
||||
|
||||
export class Arborist {
|
||||
constructor(options: ArboristOptions)
|
||||
loadVirtual(): Promise<ArboristTree | undefined>
|
||||
reify(options?: ReifyOptions): Promise<ArboristTree>
|
||||
}
|
||||
}
|
||||
|
||||
declare var Bun:
|
||||
| {
|
||||
file(path: string): {
|
||||
text(): Promise<string>
|
||||
json(): Promise<unknown>
|
||||
}
|
||||
write(path: string, content: string | Uint8Array): Promise<void>
|
||||
}
|
||||
| undefined
|
||||
@@ -2,11 +2,25 @@ import path from "path"
|
||||
import os from "os"
|
||||
import { randomBytes, randomUUID } from "crypto"
|
||||
import { mkdir, readFile, rm, stat, utimes, writeFile } from "fs/promises"
|
||||
import { Global } from "@/global"
|
||||
import { Hash } from "@/util/hash"
|
||||
import { Hash } from "./hash"
|
||||
import { Effect } from "effect"
|
||||
|
||||
export type FlockGlobal = {
|
||||
state: string
|
||||
}
|
||||
|
||||
export namespace Flock {
|
||||
const root = path.join(Global.Path.state, "locks")
|
||||
let global: FlockGlobal | undefined
|
||||
|
||||
export function setGlobal(g: FlockGlobal) {
|
||||
global = g
|
||||
}
|
||||
|
||||
const root = () => {
|
||||
if (!global) throw new Error("Flock global not set")
|
||||
return path.join(global.state, "locks")
|
||||
}
|
||||
|
||||
// Defaults for callers that do not provide timing options.
|
||||
const defaultOpts = {
|
||||
staleMs: 60_000,
|
||||
@@ -301,7 +315,7 @@ export namespace Flock {
|
||||
baseDelayMs: input.baseDelayMs ?? defaultOpts.baseDelayMs,
|
||||
maxDelayMs: input.maxDelayMs ?? defaultOpts.maxDelayMs,
|
||||
}
|
||||
const dir = input.dir ?? root
|
||||
const dir = input.dir ?? root()
|
||||
|
||||
await mkdir(dir, { recursive: true })
|
||||
const lockfile = path.join(dir, Hash.fast(key) + ".lock")
|
||||
@@ -330,4 +344,11 @@ export namespace Flock {
|
||||
input.signal?.throwIfAborted()
|
||||
return await fn()
|
||||
}
|
||||
|
||||
export const effect = Effect.fn("Flock.effect")(function* (key: string) {
|
||||
return yield* Effect.acquireRelease(
|
||||
Effect.promise((signal) => Flock.acquire(key, { signal })),
|
||||
(foo) => Effect.promise(() => foo.release()),
|
||||
).pipe(Effect.asVoid)
|
||||
})
|
||||
}
|
||||
338
packages/shared/test/filesystem/filesystem.test.ts
Normal file
338
packages/shared/test/filesystem/filesystem.test.ts
Normal file
@@ -0,0 +1,338 @@
|
||||
import { describe, test, expect } from "bun:test"
|
||||
import { Effect, Layer, FileSystem } from "effect"
|
||||
import { NodeFileSystem } from "@effect/platform-node"
|
||||
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
|
||||
import { testEffect } from "../lib/effect"
|
||||
import path from "path"
|
||||
|
||||
const live = AppFileSystem.layer.pipe(Layer.provideMerge(NodeFileSystem.layer))
|
||||
const { effect: it } = testEffect(live)
|
||||
|
||||
describe("AppFileSystem", () => {
|
||||
describe("isDir", () => {
|
||||
it(
|
||||
"returns true for directories",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
expect(yield* fs.isDir(tmp)).toBe(true)
|
||||
}),
|
||||
)
|
||||
|
||||
it(
|
||||
"returns false for files",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
const file = path.join(tmp, "test.txt")
|
||||
yield* filesys.writeFileString(file, "hello")
|
||||
expect(yield* fs.isDir(file)).toBe(false)
|
||||
}),
|
||||
)
|
||||
|
||||
it(
|
||||
"returns false for non-existent paths",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
expect(yield* fs.isDir("/tmp/nonexistent-" + Math.random())).toBe(false)
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("isFile", () => {
|
||||
it(
|
||||
"returns true for files",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
const file = path.join(tmp, "test.txt")
|
||||
yield* filesys.writeFileString(file, "hello")
|
||||
expect(yield* fs.isFile(file)).toBe(true)
|
||||
}),
|
||||
)
|
||||
|
||||
it(
|
||||
"returns false for directories",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
expect(yield* fs.isFile(tmp)).toBe(false)
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("readJson / writeJson", () => {
|
||||
it(
|
||||
"round-trips JSON data",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
const file = path.join(tmp, "data.json")
|
||||
const data = { name: "test", count: 42, nested: { ok: true } }
|
||||
|
||||
yield* fs.writeJson(file, data)
|
||||
const result = yield* fs.readJson(file)
|
||||
|
||||
expect(result).toEqual(data)
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("ensureDir", () => {
|
||||
it(
|
||||
"creates nested directories",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
const nested = path.join(tmp, "a", "b", "c")
|
||||
|
||||
yield* fs.ensureDir(nested)
|
||||
|
||||
const info = yield* filesys.stat(nested)
|
||||
expect(info.type).toBe("Directory")
|
||||
}),
|
||||
)
|
||||
|
||||
it(
|
||||
"is idempotent",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
const dir = path.join(tmp, "existing")
|
||||
yield* filesys.makeDirectory(dir)
|
||||
|
||||
yield* fs.ensureDir(dir)
|
||||
|
||||
const info = yield* filesys.stat(dir)
|
||||
expect(info.type).toBe("Directory")
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("writeWithDirs", () => {
|
||||
it(
|
||||
"creates parent directories if missing",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
const file = path.join(tmp, "deep", "nested", "file.txt")
|
||||
|
||||
yield* fs.writeWithDirs(file, "hello")
|
||||
|
||||
expect(yield* filesys.readFileString(file)).toBe("hello")
|
||||
}),
|
||||
)
|
||||
|
||||
it(
|
||||
"writes directly when parent exists",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
const file = path.join(tmp, "direct.txt")
|
||||
|
||||
yield* fs.writeWithDirs(file, "world")
|
||||
|
||||
expect(yield* filesys.readFileString(file)).toBe("world")
|
||||
}),
|
||||
)
|
||||
|
||||
it(
|
||||
"writes Uint8Array content",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
const file = path.join(tmp, "binary.bin")
|
||||
const content = new Uint8Array([0x00, 0x01, 0x02, 0x03])
|
||||
|
||||
yield* fs.writeWithDirs(file, content)
|
||||
|
||||
const result = yield* filesys.readFile(file)
|
||||
expect(new Uint8Array(result)).toEqual(content)
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("findUp", () => {
|
||||
it(
|
||||
"finds target in start directory",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
yield* filesys.writeFileString(path.join(tmp, "target.txt"), "found")
|
||||
|
||||
const result = yield* fs.findUp("target.txt", tmp)
|
||||
expect(result).toEqual([path.join(tmp, "target.txt")])
|
||||
}),
|
||||
)
|
||||
|
||||
it(
|
||||
"finds target in parent directories",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
yield* filesys.writeFileString(path.join(tmp, "marker"), "root")
|
||||
const child = path.join(tmp, "a", "b")
|
||||
yield* filesys.makeDirectory(child, { recursive: true })
|
||||
|
||||
const result = yield* fs.findUp("marker", child, tmp)
|
||||
expect(result).toEqual([path.join(tmp, "marker")])
|
||||
}),
|
||||
)
|
||||
|
||||
it(
|
||||
"returns empty array when not found",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
const result = yield* fs.findUp("nonexistent", tmp, tmp)
|
||||
expect(result).toEqual([])
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("up", () => {
|
||||
it(
|
||||
"finds multiple targets walking up",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
yield* filesys.writeFileString(path.join(tmp, "a.txt"), "a")
|
||||
yield* filesys.writeFileString(path.join(tmp, "b.txt"), "b")
|
||||
const child = path.join(tmp, "sub")
|
||||
yield* filesys.makeDirectory(child)
|
||||
yield* filesys.writeFileString(path.join(child, "a.txt"), "a-child")
|
||||
|
||||
const result = yield* fs.up({ targets: ["a.txt", "b.txt"], start: child, stop: tmp })
|
||||
|
||||
expect(result).toContain(path.join(child, "a.txt"))
|
||||
expect(result).toContain(path.join(tmp, "a.txt"))
|
||||
expect(result).toContain(path.join(tmp, "b.txt"))
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("glob", () => {
|
||||
it(
|
||||
"finds files matching pattern",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
yield* filesys.writeFileString(path.join(tmp, "a.ts"), "a")
|
||||
yield* filesys.writeFileString(path.join(tmp, "b.ts"), "b")
|
||||
yield* filesys.writeFileString(path.join(tmp, "c.json"), "c")
|
||||
|
||||
const result = yield* fs.glob("*.ts", { cwd: tmp })
|
||||
expect(result.sort()).toEqual(["a.ts", "b.ts"])
|
||||
}),
|
||||
)
|
||||
|
||||
it(
|
||||
"supports absolute paths",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
yield* filesys.writeFileString(path.join(tmp, "file.txt"), "hello")
|
||||
|
||||
const result = yield* fs.glob("*.txt", { cwd: tmp, absolute: true })
|
||||
expect(result).toEqual([path.join(tmp, "file.txt")])
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("globMatch", () => {
|
||||
it(
|
||||
"matches patterns",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
expect(fs.globMatch("*.ts", "foo.ts")).toBe(true)
|
||||
expect(fs.globMatch("*.ts", "foo.json")).toBe(false)
|
||||
expect(fs.globMatch("src/**", "src/a/b.ts")).toBe(true)
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("globUp", () => {
|
||||
it(
|
||||
"finds files walking up directories",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
yield* filesys.writeFileString(path.join(tmp, "root.md"), "root")
|
||||
const child = path.join(tmp, "a", "b")
|
||||
yield* filesys.makeDirectory(child, { recursive: true })
|
||||
yield* filesys.writeFileString(path.join(child, "leaf.md"), "leaf")
|
||||
|
||||
const result = yield* fs.globUp("*.md", child, tmp)
|
||||
expect(result).toContain(path.join(child, "leaf.md"))
|
||||
expect(result).toContain(path.join(tmp, "root.md"))
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("built-in passthrough", () => {
|
||||
it(
|
||||
"exists works",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
const file = path.join(tmp, "exists.txt")
|
||||
yield* filesys.writeFileString(file, "yes")
|
||||
|
||||
expect(yield* filesys.exists(file)).toBe(true)
|
||||
expect(yield* filesys.exists(file + ".nope")).toBe(false)
|
||||
}),
|
||||
)
|
||||
|
||||
it(
|
||||
"remove works",
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const filesys = yield* FileSystem.FileSystem
|
||||
const tmp = yield* filesys.makeTempDirectoryScoped()
|
||||
const file = path.join(tmp, "delete-me.txt")
|
||||
yield* filesys.writeFileString(file, "bye")
|
||||
|
||||
yield* filesys.remove(file)
|
||||
|
||||
expect(yield* filesys.exists(file)).toBe(false)
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("pure helpers", () => {
|
||||
test("mimeType returns correct types", () => {
|
||||
expect(AppFileSystem.mimeType("file.json")).toBe("application/json")
|
||||
expect(AppFileSystem.mimeType("image.png")).toBe("image/png")
|
||||
expect(AppFileSystem.mimeType("unknown.qzx")).toBe("application/octet-stream")
|
||||
})
|
||||
|
||||
test("contains checks path containment", () => {
|
||||
expect(AppFileSystem.contains("/a/b", "/a/b/c")).toBe(true)
|
||||
expect(AppFileSystem.contains("/a/b", "/a/c")).toBe(false)
|
||||
})
|
||||
|
||||
test("overlaps detects overlapping paths", () => {
|
||||
expect(AppFileSystem.overlaps("/a/b", "/a/b/c")).toBe(true)
|
||||
expect(AppFileSystem.overlaps("/a/b/c", "/a/b")).toBe(true)
|
||||
expect(AppFileSystem.overlaps("/a", "/b")).toBe(false)
|
||||
})
|
||||
})
|
||||
})
|
||||
72
packages/shared/test/fixture/flock-worker.ts
Normal file
72
packages/shared/test/fixture/flock-worker.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
import fs from "fs/promises"
|
||||
import { Flock } from "@opencode-ai/shared/util/flock"
|
||||
|
||||
type Msg = {
|
||||
key: string
|
||||
dir: string
|
||||
staleMs?: number
|
||||
timeoutMs?: number
|
||||
baseDelayMs?: number
|
||||
maxDelayMs?: number
|
||||
holdMs?: number
|
||||
ready?: string
|
||||
active?: string
|
||||
done?: string
|
||||
}
|
||||
|
||||
function sleep(ms: number) {
|
||||
return new Promise<void>((resolve) => {
|
||||
setTimeout(resolve, ms)
|
||||
})
|
||||
}
|
||||
|
||||
function input() {
|
||||
const raw = process.argv[2]
|
||||
if (!raw) {
|
||||
throw new Error("Missing flock worker input")
|
||||
}
|
||||
|
||||
return JSON.parse(raw) as Msg
|
||||
}
|
||||
|
||||
async function job(input: Msg) {
|
||||
if (input.ready) {
|
||||
await fs.writeFile(input.ready, String(process.pid))
|
||||
}
|
||||
|
||||
if (input.active) {
|
||||
await fs.writeFile(input.active, String(process.pid), { flag: "wx" })
|
||||
}
|
||||
|
||||
try {
|
||||
if (input.holdMs && input.holdMs > 0) {
|
||||
await sleep(input.holdMs)
|
||||
}
|
||||
|
||||
if (input.done) {
|
||||
await fs.appendFile(input.done, "1\n")
|
||||
}
|
||||
} finally {
|
||||
if (input.active) {
|
||||
await fs.rm(input.active, { force: true })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const msg = input()
|
||||
|
||||
await Flock.withLock(msg.key, () => job(msg), {
|
||||
dir: msg.dir,
|
||||
staleMs: msg.staleMs,
|
||||
timeoutMs: msg.timeoutMs,
|
||||
baseDelayMs: msg.baseDelayMs,
|
||||
maxDelayMs: msg.maxDelayMs,
|
||||
})
|
||||
}
|
||||
|
||||
await main().catch((err) => {
|
||||
const text = err instanceof Error ? (err.stack ?? err.message) : String(err)
|
||||
process.stderr.write(text)
|
||||
process.exit(1)
|
||||
})
|
||||
53
packages/shared/test/lib/effect.ts
Normal file
53
packages/shared/test/lib/effect.ts
Normal file
@@ -0,0 +1,53 @@
|
||||
import { test, type TestOptions } from "bun:test"
|
||||
import { Cause, Effect, Exit, Layer } from "effect"
|
||||
import type * as Scope from "effect/Scope"
|
||||
import * as TestClock from "effect/testing/TestClock"
|
||||
import * as TestConsole from "effect/testing/TestConsole"
|
||||
|
||||
type Body<A, E, R> = Effect.Effect<A, E, R> | (() => Effect.Effect<A, E, R>)
|
||||
|
||||
const body = <A, E, R>(value: Body<A, E, R>) => Effect.suspend(() => (typeof value === "function" ? value() : value))
|
||||
|
||||
const run = <A, E, R, E2>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer<R, E2>) =>
|
||||
Effect.gen(function* () {
|
||||
const exit = yield* body(value).pipe(Effect.scoped, Effect.provide(layer), Effect.exit)
|
||||
if (Exit.isFailure(exit)) {
|
||||
for (const err of Cause.prettyErrors(exit.cause)) {
|
||||
yield* Effect.logError(err)
|
||||
}
|
||||
}
|
||||
return yield* exit
|
||||
}).pipe(Effect.runPromise)
|
||||
|
||||
const make = <R, E>(testLayer: Layer.Layer<R, E>, liveLayer: Layer.Layer<R, E>) => {
|
||||
const effect = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
|
||||
test(name, () => run(value, testLayer), opts)
|
||||
|
||||
effect.only = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
|
||||
test.only(name, () => run(value, testLayer), opts)
|
||||
|
||||
effect.skip = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
|
||||
test.skip(name, () => run(value, testLayer), opts)
|
||||
|
||||
const live = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
|
||||
test(name, () => run(value, liveLayer), opts)
|
||||
|
||||
live.only = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
|
||||
test.only(name, () => run(value, liveLayer), opts)
|
||||
|
||||
live.skip = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
|
||||
test.skip(name, () => run(value, liveLayer), opts)
|
||||
|
||||
return { effect, live }
|
||||
}
|
||||
|
||||
// Test environment with TestClock and TestConsole
|
||||
const testEnv = Layer.mergeAll(TestConsole.layer, TestClock.layer())
|
||||
|
||||
// Live environment - uses real clock, but keeps TestConsole for output capture
|
||||
const liveEnv = TestConsole.layer
|
||||
|
||||
export const it = make(testEnv, liveEnv)
|
||||
|
||||
export const testEffect = <R, E>(layer: Layer.Layer<R, E>) =>
|
||||
make(Layer.provideMerge(layer, testEnv), Layer.provideMerge(layer, liveEnv))
|
||||
18
packages/shared/test/npm.test.ts
Normal file
18
packages/shared/test/npm.test.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
import { describe, expect, test } from "bun:test"
|
||||
import { Npm } from "@opencode-ai/shared/npm"
|
||||
|
||||
const win = process.platform === "win32"
|
||||
|
||||
describe("Npm.sanitize", () => {
|
||||
test("keeps normal scoped package specs unchanged", () => {
|
||||
expect(Npm.sanitize("@opencode/acme")).toBe("@opencode/acme")
|
||||
expect(Npm.sanitize("@opencode/acme@1.0.0")).toBe("@opencode/acme@1.0.0")
|
||||
expect(Npm.sanitize("prettier")).toBe("prettier")
|
||||
})
|
||||
|
||||
test("handles git https specs", () => {
|
||||
const spec = "acme@git+https://github.com/opencode/acme.git"
|
||||
const expected = win ? "acme@git+https_//github.com/opencode/acme.git" : spec
|
||||
expect(Npm.sanitize(spec)).toBe(expected)
|
||||
})
|
||||
})
|
||||
@@ -1,14 +1,10 @@
|
||||
import { describe, expect, test } from "bun:test"
|
||||
import fs from "fs/promises"
|
||||
import { spawn } from "child_process"
|
||||
import path from "path"
|
||||
import { Flock } from "../../src/util/flock"
|
||||
import { Hash } from "../../src/util/hash"
|
||||
import { Process } from "../../src/util/process"
|
||||
import { Filesystem } from "../../src/util/filesystem"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
|
||||
const root = path.join(import.meta.dir, "../..")
|
||||
const worker = path.join(import.meta.dir, "../fixture/flock-worker.ts")
|
||||
import os from "os"
|
||||
import { Flock } from "@opencode-ai/shared/util/flock"
|
||||
import { Hash } from "@opencode-ai/shared/util/hash"
|
||||
|
||||
type Msg = {
|
||||
key: string
|
||||
@@ -23,6 +19,19 @@ type Msg = {
|
||||
done?: string
|
||||
}
|
||||
|
||||
const root = path.join(import.meta.dir, "../..")
|
||||
const worker = path.join(import.meta.dir, "../fixture/flock-worker.ts")
|
||||
|
||||
async function tmpdir() {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "flock-test-"))
|
||||
return {
|
||||
path: dir,
|
||||
async [Symbol.asyncDispose]() {
|
||||
await fs.rm(dir, { recursive: true, force: true })
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
function lock(dir: string, key: string) {
|
||||
return path.join(dir, Hash.fast(key) + ".lock")
|
||||
}
|
||||
@@ -51,21 +60,55 @@ async function wait(file: string, timeout = 3_000) {
|
||||
}
|
||||
|
||||
function run(msg: Msg) {
|
||||
return Process.run([process.execPath, worker, JSON.stringify(msg)], {
|
||||
cwd: root,
|
||||
nothrow: true,
|
||||
return new Promise<{ code: number; stdout: Buffer; stderr: Buffer }>((resolve) => {
|
||||
const proc = spawn(process.execPath, [worker, JSON.stringify(msg)], {
|
||||
cwd: root,
|
||||
})
|
||||
|
||||
const stdout: Buffer[] = []
|
||||
const stderr: Buffer[] = []
|
||||
|
||||
proc.stdout?.on("data", (data) => stdout.push(Buffer.from(data)))
|
||||
proc.stderr?.on("data", (data) => stderr.push(Buffer.from(data)))
|
||||
|
||||
proc.on("close", (code) => {
|
||||
resolve({
|
||||
code: code ?? 1,
|
||||
stdout: Buffer.concat(stdout),
|
||||
stderr: Buffer.concat(stderr),
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function spawn(msg: Msg) {
|
||||
return Process.spawn([process.execPath, worker, JSON.stringify(msg)], {
|
||||
function spawnWorker(msg: Msg) {
|
||||
return spawn(process.execPath, [worker, JSON.stringify(msg)], {
|
||||
cwd: root,
|
||||
stdin: "ignore",
|
||||
stdout: "pipe",
|
||||
stderr: "pipe",
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
})
|
||||
}
|
||||
|
||||
function stopWorker(proc: ReturnType<typeof spawnWorker>) {
|
||||
if (proc.exitCode !== null || proc.signalCode !== null) return Promise.resolve()
|
||||
|
||||
if (process.platform !== "win32" || !proc.pid) {
|
||||
proc.kill()
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
const killProc = spawn("taskkill", ["/pid", String(proc.pid), "/T", "/F"])
|
||||
killProc.on("close", () => {
|
||||
proc.kill()
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async function readJson<T>(p: string): Promise<T> {
|
||||
return JSON.parse(await fs.readFile(p, "utf8"))
|
||||
}
|
||||
|
||||
describe("util.flock", () => {
|
||||
test("enforces mutual exclusion under process contention", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
@@ -104,7 +147,7 @@ describe("util.flock", () => {
|
||||
const dir = path.join(tmp.path, "locks")
|
||||
const key = "flock:timeout"
|
||||
const ready = path.join(tmp.path, "ready")
|
||||
const proc = spawn({
|
||||
const proc = spawnWorker({
|
||||
key,
|
||||
dir,
|
||||
ready,
|
||||
@@ -131,8 +174,8 @@ describe("util.flock", () => {
|
||||
expect(seen.length).toBeGreaterThan(0)
|
||||
expect(seen.every((x) => x === key)).toBe(true)
|
||||
} finally {
|
||||
await Process.stop(proc).catch(() => undefined)
|
||||
await proc.exited.catch(() => undefined)
|
||||
await stopWorker(proc).catch(() => undefined)
|
||||
await new Promise((resolve) => proc.on("close", resolve))
|
||||
}
|
||||
}, 15_000)
|
||||
|
||||
@@ -141,7 +184,7 @@ describe("util.flock", () => {
|
||||
const dir = path.join(tmp.path, "locks")
|
||||
const key = "flock:crash"
|
||||
const ready = path.join(tmp.path, "ready")
|
||||
const proc = spawn({
|
||||
const proc = spawnWorker({
|
||||
key,
|
||||
dir,
|
||||
ready,
|
||||
@@ -151,8 +194,8 @@ describe("util.flock", () => {
|
||||
})
|
||||
|
||||
await wait(ready, 5_000)
|
||||
await Process.stop(proc)
|
||||
await proc.exited.catch(() => undefined)
|
||||
await stopWorker(proc)
|
||||
await new Promise((resolve) => proc.on("close", resolve))
|
||||
|
||||
let hit = false
|
||||
await Flock.withLock(
|
||||
@@ -276,7 +319,7 @@ describe("util.flock", () => {
|
||||
await Flock.withLock(
|
||||
key,
|
||||
async () => {
|
||||
const json = await Filesystem.readJson<{
|
||||
const json = await readJson<{
|
||||
token?: unknown
|
||||
pid?: unknown
|
||||
hostname?: unknown
|
||||
@@ -324,7 +367,7 @@ describe("util.flock", () => {
|
||||
const err = await Flock.withLock(
|
||||
key,
|
||||
async () => {
|
||||
const json = await Filesystem.readJson<{ token?: string }>(meta)
|
||||
const json = await readJson<{ token?: string }>(meta)
|
||||
json.token = "tampered"
|
||||
await fs.writeFile(meta, JSON.stringify(json, null, 2))
|
||||
},
|
||||
Reference in New Issue
Block a user