Compare commits

..

2 Commits

Author SHA1 Message Date
Aiden Cline
df4362a719 ignore: example tui plugin 2026-04-15 11:37:36 -05:00
Kit Langton
f1751401aa fix(effect): add effect bridge for callback contexts (#22504) 2026-04-15 15:22:34 +00:00
28 changed files with 562 additions and 465 deletions

View File

@@ -0,0 +1,25 @@
import type { TuiPluginModule } from "@opencode-ai/plugin/tui"
let seen = false
const plugin: TuiPluginModule & { id: string } = {
id: "local.config-once-toast",
async tui(api) {
if (seen) return
const cfg = api.state.config
if (cfg.plugin !== undefined && !Array.isArray(cfg.plugin)) {
throw new Error("Invalid config: plugin must be an array")
}
const mdl = typeof cfg.model === "string" && cfg.model.trim() ? cfg.model : "default"
seen = true
api.ui.toast({
title: "Config check",
message: `This is a 1 time toast, validating ur config (model: ${mdl})`,
variant: "info",
})
},
}
export default plugin

View File

@@ -1,6 +1,7 @@
{
"$schema": "https://opencode.ai/tui.json",
"plugin": [
"./plugins/tui-config-once-toast.tsx",
[
"./plugins/tui-smoke.tsx",
{

View File

@@ -510,6 +510,7 @@
"effect": "catalog:",
},
"devDependencies": {
"@typescript/native-preview": "catalog:",
"typescript": "catalog:",
},
},

View File

@@ -33,6 +33,7 @@ import {
import { Log } from "../util/log"
import { pathToFileURL } from "url"
import { Filesystem } from "../util/filesystem"
import { Hash } from "../util/hash"
import { ACPSessionManager } from "./session"
import type { ACPConfig } from "./types"
@@ -47,9 +48,7 @@ import { Todo } from "@/session/todo"
import { z } from "zod"
import { LoadAPIKeyError } from "ai"
import type { AssistantMessage, Event, OpencodeClient, SessionMessageResponse, ToolPart } from "@opencode-ai/sdk/v2"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
import { applyPatch } from "diff"
import { Effect } from "effect"
type ModeOption = { id: string; name: string; description?: string }
type ModelOption = { modelId: string; name: string }
@@ -239,13 +238,7 @@ export namespace ACP {
const metadata = permission.metadata || {}
const filepath = typeof metadata["filepath"] === "string" ? metadata["filepath"] : ""
const diff = typeof metadata["diff"] === "string" ? metadata["diff"] : ""
const content = await AppRuntime.runPromise(
AppFileSystem.Service.use((fs) =>
fs
.existsSafe(filepath)
.pipe(Effect.flatMap((exists) => (exists ? fs.readFileString(filepath) : Effect.succeed("")))),
),
)
const content = (await Filesystem.exists(filepath)) ? await Filesystem.readText(filepath) : ""
const newContent = getNewContent(content, diff)
if (newContent) {

View File

@@ -1,6 +1,6 @@
import z from "zod"
import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect"
import { EffectLogger } from "@/effect/logger"
import { EffectBridge } from "@/effect/bridge"
import { Log } from "../util/log"
import { BusEvent } from "./bus-event"
import { GlobalBus } from "./global"
@@ -128,6 +128,7 @@ export namespace Bus {
function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
return Effect.gen(function* () {
log.info("subscribing", { type })
const bridge = yield* EffectBridge.make()
const scope = yield* Scope.make()
const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
@@ -147,7 +148,7 @@ export namespace Bus {
return () => {
log.info("unsubscribing", { type })
Effect.runFork(Scope.close(scope, Exit.void).pipe(Effect.provide(EffectLogger.layer)))
bridge.fork(Scope.close(scope, Exit.void))
}
})
}

View File

@@ -7,11 +7,11 @@ import { Agent } from "../../agent/agent"
import { Provider } from "../../provider/provider"
import path from "path"
import fs from "fs/promises"
import { Filesystem } from "../../util/filesystem"
import matter from "gray-matter"
import { Instance } from "../../project/instance"
import { EOL } from "os"
import type { Argv } from "yargs"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
type AgentMode = "all" | "primary" | "subagent"
@@ -194,7 +194,7 @@ const AgentCreateCommand = cmd({
await fs.mkdir(targetPath, { recursive: true })
if (await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.existsSafe(filePath)))) {
if (await Filesystem.exists(filePath)) {
if (isFullyNonInteractive) {
console.error(`Error: Agent file already exists: ${filePath}`)
process.exit(1)
@@ -203,7 +203,7 @@ const AgentCreateCommand = cmd({
throw new UI.CancelledError()
}
await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.writeWithDirs(filePath, content)))
await Filesystem.write(filePath, content)
if (isFullyNonInteractive) {
console.log(filePath)

View File

@@ -1,5 +1,6 @@
import path from "path"
import { exec } from "child_process"
import { Filesystem } from "../../util/filesystem"
import * as prompts from "@clack/prompts"
import { map, pipe, sortBy, values } from "remeda"
import { Octokit } from "@octokit/rest"
@@ -33,7 +34,6 @@ import { Git } from "@/git"
import { setTimeout as sleep } from "node:timers/promises"
import { Process } from "@/util/process"
import { Effect } from "effect"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
type GitHubAuthor = {
login: string
@@ -381,11 +381,9 @@ export const GithubInstallCommand = cmd({
? ""
: `\n env:${providers[provider].env.map((e) => `\n ${e}: \${{ secrets.${e} }}`).join("")}`
await AppRuntime.runPromise(
AppFileSystem.Service.use((fs) =>
fs.writeWithDirs(
path.join(app.root, WORKFLOW_FILE),
`name: opencode
await Filesystem.write(
path.join(app.root, WORKFLOW_FILE),
`name: opencode
on:
issue_comment:
@@ -416,8 +414,6 @@ jobs:
uses: anomalyco/opencode/github@latest${envStr}
with:
model: ${provider}/${model}`,
),
),
)
prompts.log.success(`Added workflow file: "${WORKFLOW_FILE}"`)

View File

@@ -9,8 +9,8 @@ import { SessionTable, MessageTable, PartTable } from "../../session/session.sql
import { Instance } from "../../project/instance"
import { ShareNext } from "../../share/share-next"
import { EOL } from "os"
import { Filesystem } from "../../util/filesystem"
import { AppRuntime } from "@/effect/app-runtime"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
/** Discriminated union returned by the ShareNext API (GET /api/shares/:id/data) */
export type ShareData =
@@ -140,9 +140,7 @@ export const ImportCommand = cmd({
exportData = transformed
} else {
exportData = (await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.readJson(args.file))).catch(
() => undefined,
)) as NonNullable<typeof exportData> | undefined
exportData = await Filesystem.readJson<NonNullable<typeof exportData>>(args.file).catch(() => undefined)
if (!exportData) {
process.stdout.write(`File not found: ${args.file}`)
process.stdout.write(EOL)

View File

@@ -13,10 +13,10 @@ import { Installation } from "../../installation"
import path from "path"
import { Global } from "../../global"
import { modify, applyEdits } from "jsonc-parser"
import { Filesystem } from "../../util/filesystem"
import { Bus } from "../../bus"
import { AppRuntime } from "../../effect/app-runtime"
import { Effect } from "effect"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
function getAuthStatusIcon(status: MCP.AuthStatus): string {
switch (status) {
@@ -416,7 +416,7 @@ async function resolveConfigPath(baseDir: string, global = false) {
}
for (const candidate of candidates) {
if (await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.existsSafe(candidate)))) {
if (await Filesystem.exists(candidate)) {
return candidate
}
}
@@ -427,8 +427,8 @@ async function resolveConfigPath(baseDir: string, global = false) {
async function addMcpToConfig(name: string, mcpConfig: Config.Mcp, configPath: string) {
let text = "{}"
if (await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.existsSafe(configPath)))) {
text = await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.readFileString(configPath)))
if (await Filesystem.exists(configPath)) {
text = await Filesystem.readText(configPath)
}
// Use jsonc-parser to modify while preserving comments
@@ -437,7 +437,7 @@ async function addMcpToConfig(name: string, mcpConfig: Config.Mcp, configPath: s
})
const result = applyEdits(text, edits)
await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.writeWithDirs(configPath, result)))
await Filesystem.write(configPath, result)
return configPath
}

View File

@@ -1,12 +1,12 @@
import type { Argv } from "yargs"
import path from "path"
import { pathToFileURL } from "url"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
import { UI } from "../ui"
import { cmd } from "./cmd"
import { Flag } from "../../flag/flag"
import { bootstrap } from "../bootstrap"
import { EOL } from "os"
import { Filesystem } from "../../util/filesystem"
import { createOpencodeClient, type OpencodeClient, type ToolPart } from "@opencode-ai/sdk/v2"
import { Server } from "../../server/server"
import { Provider } from "../../provider/provider"
@@ -332,14 +332,12 @@ export const RunCommand = cmd({
for (const filePath of list) {
const resolvedPath = path.resolve(process.cwd(), filePath)
if (!(await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.existsSafe(resolvedPath))))) {
if (!(await Filesystem.exists(resolvedPath))) {
UI.error(`File not found: ${filePath}`)
process.exit(1)
}
const mime = (await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.isDir(resolvedPath))))
? "application/x-directory"
: "text/plain"
const mime = (await Filesystem.isDir(resolvedPath)) ? "application/x-directory" : "text/plain"
files.push({
type: "file",

View File

@@ -7,8 +7,8 @@ import { Global } from "../../global"
import fs from "fs/promises"
import path from "path"
import os from "os"
import { Filesystem } from "../../util/filesystem"
import { Process } from "../../util/process"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
interface UninstallArgs {
keepConfig: boolean
@@ -266,9 +266,7 @@ async function getShellConfigFile(): Promise<string | null> {
.catch(() => false)
if (!exists) continue
const content = await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.readFileString(file))).catch(
() => "",
)
const content = await Filesystem.readText(file).catch(() => "")
if (content.includes("# opencode") || content.includes(".opencode/bin")) {
return file
}
@@ -278,7 +276,7 @@ async function getShellConfigFile(): Promise<string | null> {
}
async function cleanShellConfig(file: string) {
const content = await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.readFileString(file)))
const content = await Filesystem.readText(file)
const lines = content.split("\n")
const filtered: string[] = []
@@ -314,7 +312,7 @@ async function cleanShellConfig(file: string) {
}
const output = filtered.join("\n") + "\n"
await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.writeWithDirs(file, output)))
await Filesystem.write(file, output)
}
async function getDirectorySize(dir: string): Promise<number> {

View File

@@ -1,9 +1,9 @@
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
import { EffectBridge } from "@/effect/bridge"
import type { InstanceContext } from "@/project/instance"
import { SessionID, MessageID } from "@/session/schema"
import { Effect, Layer, Context } from "effect"
import { EffectLogger } from "@/effect/logger"
import z from "zod"
import { Config } from "../config/config"
import { MCP } from "../mcp"
@@ -82,6 +82,7 @@ export namespace Command {
const init = Effect.fn("Command.state")(function* (ctx: InstanceContext) {
const cfg = yield* config.get()
const bridge = yield* EffectBridge.make()
const commands: Record<string, Info> = {}
commands[Default.INIT] = {
@@ -125,7 +126,7 @@ export namespace Command {
source: "mcp",
description: prompt.description,
get template() {
return Effect.runPromise(
return bridge.promise(
mcp
.getPrompt(
prompt.client,
@@ -141,7 +142,6 @@ export namespace Command {
.map((message) => (message.content.type === "text" ? message.content.text : ""))
.join("\n") || "",
),
Effect.provide(EffectLogger.layer),
),
)
},

View File

@@ -1,8 +1,7 @@
import { NamedError } from "@opencode-ai/shared/util/error"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
import matter from "gray-matter"
import { z } from "zod"
import { AppRuntime } from "@/effect/app-runtime"
import { Filesystem } from "../util/filesystem"
export namespace ConfigMarkdown {
export const FILE_REGEX = /(?<![\w`])@(\.?[^\s`,.]*(?:\.[^\s`,.]+)*)/g
@@ -70,7 +69,7 @@ export namespace ConfigMarkdown {
}
export async function parse(filePath: string) {
const template = await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.readFileString(filePath)))
const template = await Filesystem.readText(filePath)
try {
const md = matter(template)

View File

@@ -12,6 +12,10 @@ export const WorkspaceContext = {
return context.provide({ workspaceID: input.workspaceID as string }, () => input.fn())
},
restore<R>(workspaceID: string, fn: () => R): R {
return context.provide({ workspaceID }, fn)
},
get workspaceID() {
try {
return context.use().workspaceID

View File

@@ -9,9 +9,9 @@ import { SyncEvent } from "@/sync"
import { EventTable } from "@/sync/event.sql"
import { Flag } from "@/flag/flag"
import { Log } from "@/util/log"
import { Filesystem } from "@/util/filesystem"
import { ProjectID } from "@/project/schema"
import { Slug } from "@opencode-ai/shared/util/slug"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
import { WorkspaceTable } from "./workspace.sql"
import { getAdaptor } from "./adaptors"
import { WorkspaceInfo } from "./types"
@@ -418,7 +418,7 @@ export namespace Workspace {
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
if (space.type === "worktree") {
void AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.existsSafe(space.directory!))).then((exists) => {
void Filesystem.exists(space.directory!).then((exists) => {
setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
})
return

View File

@@ -0,0 +1,49 @@
import { Effect, Fiber } from "effect"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import { Instance, type InstanceContext } from "@/project/instance"
import { LocalContext } from "@/util/local-context"
import { InstanceRef, WorkspaceRef } from "./instance-ref"
import { attachWith } from "./run-service"
export namespace EffectBridge {
export interface Shape {
readonly promise: <A, E, R>(effect: Effect.Effect<A, E, R>) => Promise<A>
readonly fork: <A, E, R>(effect: Effect.Effect<A, E, R>) => Fiber.Fiber<A, E>
}
function restore<R>(instance: InstanceContext | undefined, workspace: string | undefined, fn: () => R): R {
if (instance && workspace !== undefined) {
return WorkspaceContext.restore(workspace, () => Instance.restore(instance, fn))
}
if (instance) return Instance.restore(instance, fn)
if (workspace !== undefined) return WorkspaceContext.restore(workspace, fn)
return fn()
}
export function make(): Effect.Effect<Shape> {
return Effect.gen(function* () {
const ctx = yield* Effect.context()
const value = yield* InstanceRef
const instance =
value ??
(() => {
try {
return Instance.current
} catch (err) {
if (!(err instanceof LocalContext.NotFound)) throw err
}
})()
const workspace = (yield* WorkspaceRef) ?? WorkspaceContext.workspaceID
const attach = <A, E, R>(effect: Effect.Effect<A, E, R>) => attachWith(effect, { instance, workspace })
const wrap = <A, E, R>(effect: Effect.Effect<A, E, R>) =>
attach(effect).pipe(Effect.provide(ctx)) as Effect.Effect<A, E, never>
return {
promise: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
restore(instance, workspace, () => Effect.runPromise(wrap(effect))),
fork: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
restore(instance, workspace, () => Effect.runFork(wrap(effect))),
} satisfies Shape
})
}
}

View File

@@ -5,14 +5,31 @@ import { LocalContext } from "@/util/local-context"
import { InstanceRef, WorkspaceRef } from "./instance-ref"
import { Observability } from "./observability"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import type { InstanceContext } from "@/project/instance"
export const memoMap = Layer.makeMemoMapUnsafe()
type Refs = {
instance?: InstanceContext
workspace?: string
}
export function attachWith<A, E, R>(effect: Effect.Effect<A, E, R>, refs: Refs): Effect.Effect<A, E, R> {
if (!refs.instance && !refs.workspace) return effect
if (!refs.instance) return effect.pipe(Effect.provideService(WorkspaceRef, refs.workspace))
if (!refs.workspace) return effect.pipe(Effect.provideService(InstanceRef, refs.instance))
return effect.pipe(
Effect.provideService(InstanceRef, refs.instance),
Effect.provideService(WorkspaceRef, refs.workspace),
)
}
export function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
try {
const ctx = Instance.current
const workspaceID = WorkspaceContext.workspaceID
return effect.pipe(Effect.provideService(InstanceRef, ctx), Effect.provideService(WorkspaceRef, workspaceID))
return attachWith(effect, {
instance: Instance.current,
workspace: WorkspaceContext.workspaceID,
})
} catch (err) {
if (!(err instanceof LocalContext.NotFound)) throw err
}

View File

@@ -1,11 +1,11 @@
import fs from "fs/promises"
import path from "path"
import { fileURLToPath } from "url"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
import z from "zod"
import { Cause, Context, Effect, Layer, Queue, Stream } from "effect"
import { ripgrep } from "ripgrep"
import { makeRuntime } from "@/effect/run-service"
import { Filesystem } from "@/util/filesystem"
import { Log } from "@/util/log"
export namespace Ripgrep {
@@ -275,11 +275,10 @@ export namespace Ripgrep {
return Effect.succeed(OPENCODE_RIPGREP_WORKER_PATH)
}
const js = new URL("./ripgrep.worker.js", import.meta.url)
return Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const exists = yield* fs.exists(fileURLToPath(js)).pipe(Effect.orElseSucceed(() => false))
return exists ? js : new URL("./ripgrep.worker.ts", import.meta.url)
})
return Effect.tryPromise({
try: () => Filesystem.exists(fileURLToPath(js)),
catch: toError,
}).pipe(Effect.map((exists) => (exists ? js : new URL("./ripgrep.worker.ts", import.meta.url))))
}
function worker() {

View File

@@ -1,9 +1,8 @@
import fs from "fs/promises"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
import { xdgData, xdgCache, xdgConfig, xdgState } from "xdg-basedir"
import path from "path"
import os from "os"
import { Effect } from "effect"
import { Filesystem } from "../util/filesystem"
const app = "opencode"
@@ -37,11 +36,7 @@ await Promise.all([
const CACHE_VERSION = "21"
const version = await Effect.runPromise(
AppFileSystem.Service.use((fs) => fs.readFileString(path.join(Global.Path.cache, "version"))).pipe(
Effect.provide(AppFileSystem.defaultLayer),
),
).catch(() => "0")
const version = await Filesystem.readText(path.join(Global.Path.cache, "version")).catch(() => "0")
if (version !== CACHE_VERSION) {
try {
@@ -55,9 +50,5 @@ if (version !== CACHE_VERSION) {
),
)
} catch (e) {}
await Effect.runPromise(
AppFileSystem.Service.use((fs) => fs.writeWithDirs(path.join(Global.Path.cache, "version"), CACHE_VERSION)).pipe(
Effect.provide(AppFileSystem.defaultLayer),
),
)
await Filesystem.write(path.join(Global.Path.cache, "version"), CACHE_VERSION)
}

View File

@@ -11,10 +11,10 @@ import { UninstallCommand } from "./cli/cmd/uninstall"
import { ModelsCommand } from "./cli/cmd/models"
import { UI } from "./cli/ui"
import { Installation } from "./installation"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
import { NamedError } from "@opencode-ai/shared/util/error"
import { FormatError } from "./cli/error"
import { ServeCommand } from "./cli/cmd/serve"
import { Filesystem } from "./util/filesystem"
import { DebugCommand } from "./cli/cmd/debug"
import { StatsCommand } from "./cli/cmd/stats"
import { McpCommand } from "./cli/cmd/mcp"
@@ -37,7 +37,6 @@ import { errorMessage } from "./util/error"
import { PluginCommand } from "./cli/cmd/plug"
import { Heap } from "./cli/heap"
import { drizzle } from "drizzle-orm/bun-sqlite"
import { AppRuntime } from "@/effect/app-runtime"
process.on("unhandledRejection", (e) => {
Log.Default.error("rejection", {
@@ -111,7 +110,7 @@ const cli = yargs(args)
})
const marker = path.join(Global.Path.data, "opencode.db")
if (!(await AppRuntime.runPromise(AppFileSystem.Service.use((fs) => fs.existsSafe(marker))))) {
if (!(await Filesystem.exists(marker))) {
const tty = process.stderr.isTTY
process.stderr.write("Performing one time database migration, may take a few minutes..." + EOL)
const width = 36

View File

@@ -25,7 +25,7 @@ import { Bus } from "@/bus"
import { TuiEvent } from "@/cli/cmd/tui/event"
import open from "open"
import { Effect, Exit, Layer, Option, Context, Stream } from "effect"
import { EffectLogger } from "@/effect/logger"
import { EffectBridge } from "@/effect/bridge"
import { InstanceState } from "@/effect/instance-state"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
@@ -471,25 +471,24 @@ export namespace MCP {
Effect.catch(() => Effect.succeed([] as number[])),
)
function watch(s: State, name: string, client: MCPClient, timeout?: number) {
function watch(s: State, name: string, client: MCPClient, bridge: EffectBridge.Shape, timeout?: number) {
client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
log.info("tools list changed notification received", { server: name })
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
const listed = await Effect.runPromise(defs(name, client, timeout).pipe(Effect.provide(EffectLogger.layer)))
const listed = await bridge.promise(defs(name, client, timeout))
if (!listed) return
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
s.defs[name] = listed
await Effect.runPromise(
bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore, Effect.provide(EffectLogger.layer)),
)
await bridge.promise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore))
})
}
const state = yield* InstanceState.make<State>(
Effect.fn("MCP.state")(function* () {
const cfg = yield* cfgSvc.get()
const bridge = yield* EffectBridge.make()
const config = cfg.mcp ?? {}
const s: State = {
status: {},
@@ -518,7 +517,7 @@ export namespace MCP {
if (result.mcpClient) {
s.clients[key] = result.mcpClient
s.defs[key] = result.defs!
watch(s, key, result.mcpClient, mcp.timeout)
watch(s, key, result.mcpClient, bridge, mcp.timeout)
}
}),
{ concurrency: "unbounded" },
@@ -565,11 +564,12 @@ export namespace MCP {
listed: MCPToolDef[],
timeout?: number,
) {
const bridge = yield* EffectBridge.make()
yield* closeClient(s, name)
s.status[name] = { status: "connected" }
s.clients[name] = client
s.defs[name] = listed
watch(s, name, client, timeout)
watch(s, name, client, bridge, timeout)
return s.status[name]
})

View File

@@ -18,7 +18,7 @@ import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
import { PoeAuthPlugin } from "opencode-poe-auth"
import { CloudflareAIGatewayAuthPlugin, CloudflareWorkersAuthPlugin } from "./cloudflare"
import { Effect, Layer, Context, Stream } from "effect"
import { EffectLogger } from "@/effect/logger"
import { EffectBridge } from "@/effect/bridge"
import { InstanceState } from "@/effect/instance-state"
import { errorMessage } from "@/util/error"
import { PluginLoader } from "./loader"
@@ -90,14 +90,6 @@ export namespace Plugin {
return result
}
function publishPluginError(bus: Bus.Interface, message: string) {
Effect.runFork(
bus
.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() })
.pipe(Effect.provide(EffectLogger.layer)),
)
}
async function applyPlugin(load: PluginLoader.Loaded, input: PluginInput, hooks: Hooks[]) {
const plugin = readV1Plugin(load.mod, load.spec, "server", "detect")
if (plugin) {
@@ -120,6 +112,11 @@ export namespace Plugin {
const state = yield* InstanceState.make<State>(
Effect.fn("Plugin.state")(function* (ctx) {
const hooks: Hooks[] = []
const bridge = yield* EffectBridge.make()
function publishPluginError(message: string) {
bridge.fork(bus.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() }))
}
const { Server } = yield* Effect.promise(() => import("../server/server"))
@@ -187,24 +184,24 @@ export namespace Plugin {
if (stage === "install") {
const parsed = parsePluginSpecifier(spec)
log.error("failed to install plugin", { pkg: parsed.pkg, version: parsed.version, error: message })
publishPluginError(bus, `Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
publishPluginError(`Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
return
}
if (stage === "compatibility") {
log.warn("plugin incompatible", { path: spec, error: message })
publishPluginError(bus, `Plugin ${spec} skipped: ${message}`)
publishPluginError(`Plugin ${spec} skipped: ${message}`)
return
}
if (stage === "entry") {
log.error("failed to resolve plugin server entry", { path: spec, error: message })
publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
publishPluginError(`Failed to load plugin ${spec}: ${message}`)
return
}
log.error("failed to load plugin", { path: spec, target: resolved?.entry, error: message })
publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
publishPluginError(`Failed to load plugin ${spec}: ${message}`)
},
},
}),

View File

@@ -19,7 +19,7 @@ import { iife } from "@/util/iife"
import { Global } from "../global"
import path from "path"
import { Effect, Layer, Context } from "effect"
import { EffectLogger } from "@/effect/logger"
import { EffectBridge } from "@/effect/bridge"
import { InstanceState } from "@/effect/instance-state"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
import { isRecord } from "@/util/record"
@@ -1043,6 +1043,7 @@ export namespace Provider {
const state = yield* InstanceState.make<State>(() =>
Effect.gen(function* () {
using _ = log.time("state")
const bridge = yield* EffectBridge.make()
const cfg = yield* config.get()
const modelsDev = yield* Effect.promise(() => ModelsDev.get())
const database = mapValues(modelsDev, fromModelsDevProvider)
@@ -1223,8 +1224,7 @@ export namespace Provider {
const options = yield* Effect.promise(() =>
plugin.auth!.loader!(
() =>
Effect.runPromise(auth.get(providerID).pipe(Effect.orDie, Effect.provide(EffectLogger.layer))) as any,
() => bridge.promise(auth.get(providerID).pipe(Effect.orDie)) as any,
database[plugin.auth!.provider],
),
)

View File

@@ -10,7 +10,7 @@ import { Shell } from "@/shell/shell"
import { Plugin } from "@/plugin"
import { PtyID } from "./schema"
import { Effect, Layer, Context } from "effect"
import { EffectLogger } from "@/effect/logger"
import { EffectBridge } from "@/effect/bridge"
export namespace Pty {
const log = Log.create({ service: "pty" })
@@ -173,6 +173,7 @@ export namespace Pty {
const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
const s = yield* InstanceState.get(state)
const bridge = yield* EffectBridge.make()
const id = PtyID.ascending()
const command = input.command || Shell.preferred()
const args = input.args || []
@@ -256,8 +257,8 @@ export namespace Pty {
if (session.info.status === "exited") return
log.info("session exited", { id, exitCode })
session.info.status = "exited"
Effect.runFork(bus.publish(Event.Exited, { id, exitCode }).pipe(Effect.provide(EffectLogger.layer)))
Effect.runFork(remove(id).pipe(Effect.provide(EffectLogger.layer)))
bridge.fork(bus.publish(Event.Exited, { id, exitCode }))
bridge.fork(remove(id))
}),
)
yield* bus.publish(Event.Created, { info })

View File

@@ -20,13 +20,12 @@ import { Wildcard } from "@/util/wildcard"
import { SessionID } from "@/session/schema"
import { Auth } from "@/auth"
import { Installation } from "@/installation"
import { makeRuntime } from "@/effect/run-service"
import { EffectBridge } from "@/effect/bridge"
import * as Option from "effect/Option"
import * as OtelTracer from "@effect/opentelemetry/Tracer"
export namespace LLM {
const log = Log.create({ service: "llm" })
const perms = makeRuntime(Permission.Service, Permission.defaultLayer)
export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX
type Result = Awaited<ReturnType<typeof streamText>>
@@ -57,369 +56,371 @@ export namespace LLM {
export class Service extends Context.Service<Service, Interface>()("@opencode/LLM") {}
export const layer: Layer.Layer<Service, never, Auth.Service | Config.Service | Provider.Service | Plugin.Service> =
Layer.effect(
Service,
Effect.gen(function* () {
const auth = yield* Auth.Service
const config = yield* Config.Service
const provider = yield* Provider.Service
const plugin = yield* Plugin.Service
const live: Layer.Layer<
Service,
never,
Auth.Service | Config.Service | Provider.Service | Plugin.Service | Permission.Service
> = Layer.effect(
Service,
Effect.gen(function* () {
const auth = yield* Auth.Service
const config = yield* Config.Service
const provider = yield* Provider.Service
const plugin = yield* Plugin.Service
const perm = yield* Permission.Service
const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
const l = log
.clone()
.tag("providerID", input.model.providerID)
.tag("modelID", input.model.id)
.tag("sessionID", input.sessionID)
.tag("small", (input.small ?? false).toString())
.tag("agent", input.agent.name)
.tag("mode", input.agent.mode)
l.info("stream", {
modelID: input.model.id,
providerID: input.model.providerID,
})
const [language, cfg, item, info] = yield* Effect.all(
[
provider.getLanguage(input.model),
config.get(),
provider.getProvider(input.model.providerID),
auth.get(input.model.providerID),
],
{ concurrency: "unbounded" },
)
// TODO: move this to a proper hook
const isOpenaiOauth = item.id === "openai" && info?.type === "oauth"
const system: string[] = []
system.push(
[
// use agent prompt otherwise provider prompt
...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
// any custom prompt passed into this call
...input.system,
// any custom prompt from last user message
...(input.user.system ? [input.user.system] : []),
]
.filter((x) => x)
.join("\n"),
)
const header = system[0]
yield* plugin.trigger(
"experimental.chat.system.transform",
{ sessionID: input.sessionID, model: input.model },
{ system },
)
// rejoin to maintain 2-part structure for caching if header unchanged
if (system.length > 2 && system[0] === header) {
const rest = system.slice(1)
system.length = 0
system.push(header, rest.join("\n"))
}
const variant =
!input.small && input.model.variants && input.user.model.variant
? input.model.variants[input.user.model.variant]
: {}
const base = input.small
? ProviderTransform.smallOptions(input.model)
: ProviderTransform.options({
model: input.model,
sessionID: input.sessionID,
providerOptions: item.options,
})
const options: Record<string, any> = pipe(
base,
mergeDeep(input.model.options),
mergeDeep(input.agent.options),
mergeDeep(variant),
)
if (isOpenaiOauth) {
options.instructions = system.join("\n")
}
const isWorkflow = language instanceof GitLabWorkflowLanguageModel
const messages = isOpenaiOauth
? input.messages
: isWorkflow
? input.messages
: [
...system.map(
(x): ModelMessage => ({
role: "system",
content: x,
}),
),
...input.messages,
]
const params = yield* plugin.trigger(
"chat.params",
{
sessionID: input.sessionID,
agent: input.agent.name,
model: input.model,
provider: item,
message: input.user,
},
{
temperature: input.model.capabilities.temperature
? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
: undefined,
topP: input.agent.topP ?? ProviderTransform.topP(input.model),
topK: ProviderTransform.topK(input.model),
maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
options,
},
)
const { headers } = yield* plugin.trigger(
"chat.headers",
{
sessionID: input.sessionID,
agent: input.agent.name,
model: input.model,
provider: item,
message: input.user,
},
{
headers: {},
},
)
const tools = resolveTools(input)
// LiteLLM and some Anthropic proxies require the tools parameter to be present
// when message history contains tool calls, even if no tools are being used.
// Add a dummy tool that is never called to satisfy this validation.
// This is enabled for:
// 1. Providers with "litellm" in their ID or API ID (auto-detected)
// 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
const isLiteLLMProxy =
item.options?.["litellmProxy"] === true ||
input.model.providerID.toLowerCase().includes("litellm") ||
input.model.api.id.toLowerCase().includes("litellm")
// LiteLLM/Bedrock rejects requests where the message history contains tool
// calls but no tools param is present. When there are no active tools (e.g.
// during compaction), inject a stub tool to satisfy the validation requirement.
// The stub description explicitly tells the model not to call it.
if (
(isLiteLLMProxy || input.model.providerID.includes("github-copilot")) &&
Object.keys(tools).length === 0 &&
hasToolCalls(input.messages)
) {
tools["_noop"] = tool({
description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
inputSchema: jsonSchema({
type: "object",
properties: {
reason: { type: "string", description: "Unused" },
},
}),
execute: async () => ({ output: "", title: "", metadata: {} }),
})
}
// Wire up toolExecutor for DWS workflow models so that tool calls
// from the workflow service are executed via opencode's tool system
// and results sent back over the WebSocket.
if (language instanceof GitLabWorkflowLanguageModel) {
const workflowModel = language as GitLabWorkflowLanguageModel & {
sessionID?: string
sessionPreapprovedTools?: string[]
approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
}
workflowModel.sessionID = input.sessionID
workflowModel.systemPrompt = system.join("\n")
workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
const t = tools[toolName]
if (!t || !t.execute) {
return { result: "", error: `Unknown tool: ${toolName}` }
}
try {
const result = await t.execute!(JSON.parse(argsJson), {
toolCallId: _requestID,
messages: input.messages,
abortSignal: input.abort,
})
const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
return {
result: output,
metadata: typeof result === "object" ? result?.metadata : undefined,
title: typeof result === "object" ? result?.title : undefined,
}
} catch (e: any) {
return { result: "", error: e.message ?? String(e) }
}
}
const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
return !match || match.action !== "ask"
})
const approvedToolsForSession = new Set<string>()
workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
// Auto-approve tools that were already approved in this session
// (prevents infinite approval loops for server-side MCP tools)
if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
return { approved: true }
}
const id = PermissionID.ascending()
let reply: Permission.Reply | undefined
let unsub: (() => void) | undefined
try {
unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
if (evt.properties.requestID === id) reply = evt.properties.reply
})
const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
try {
const parsed = JSON.parse(t.args) as Record<string, unknown>
const title = (parsed?.title ?? parsed?.name ?? "") as string
return title ? `${t.name}: ${title}` : t.name
} catch {
return t.name
}
})
const uniquePatterns = [...new Set(toolPatterns)] as string[]
await perms.runPromise((svc) =>
svc.ask({
id,
sessionID: SessionID.make(input.sessionID),
permission: "workflow_tool_approval",
patterns: uniquePatterns,
metadata: { tools: approvalTools },
always: uniquePatterns,
ruleset: [],
}),
)
for (const name of uniqueNames) approvedToolsForSession.add(name)
workflowModel.sessionPreapprovedTools = [
...(workflowModel.sessionPreapprovedTools ?? []),
...uniqueNames,
]
return { approved: true }
} catch {
return { approved: false }
} finally {
unsub?.()
}
})
}
const tracer = cfg.experimental?.openTelemetry
? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer))
: undefined
return streamText({
onError(error) {
l.error("stream error", {
error,
})
},
async experimental_repairToolCall(failed) {
const lower = failed.toolCall.toolName.toLowerCase()
if (lower !== failed.toolCall.toolName && tools[lower]) {
l.info("repairing tool call", {
tool: failed.toolCall.toolName,
repaired: lower,
})
return {
...failed.toolCall,
toolName: lower,
}
}
return {
...failed.toolCall,
input: JSON.stringify({
tool: failed.toolCall.toolName,
error: failed.error.message,
}),
toolName: "invalid",
}
},
temperature: params.temperature,
topP: params.topP,
topK: params.topK,
providerOptions: ProviderTransform.providerOptions(input.model, params.options),
activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
tools,
toolChoice: input.toolChoice,
maxOutputTokens: params.maxOutputTokens,
abortSignal: input.abort,
headers: {
...(input.model.providerID.startsWith("opencode")
? {
"x-opencode-project": Instance.project.id,
"x-opencode-session": input.sessionID,
"x-opencode-request": input.user.id,
"x-opencode-client": Flag.OPENCODE_CLIENT,
}
: {
"x-session-affinity": input.sessionID,
...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
"User-Agent": `opencode/${Installation.VERSION}`,
}),
...input.model.headers,
...headers,
},
maxRetries: input.retries ?? 0,
messages,
model: wrapLanguageModel({
model: language,
middleware: [
{
specificationVersion: "v3" as const,
async transformParams(args) {
if (args.type === "stream") {
// @ts-expect-error
args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
}
return args.params
},
},
],
}),
experimental_telemetry: {
isEnabled: cfg.experimental?.openTelemetry,
functionId: "session.llm",
tracer,
metadata: {
userId: cfg.username ?? "unknown",
sessionId: input.sessionID,
},
},
})
const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
const l = log
.clone()
.tag("providerID", input.model.providerID)
.tag("modelID", input.model.id)
.tag("sessionID", input.sessionID)
.tag("small", (input.small ?? false).toString())
.tag("agent", input.agent.name)
.tag("mode", input.agent.mode)
l.info("stream", {
modelID: input.model.id,
providerID: input.model.providerID,
})
const stream: Interface["stream"] = (input) =>
Stream.scoped(
Stream.unwrap(
Effect.gen(function* () {
const ctrl = yield* Effect.acquireRelease(
Effect.sync(() => new AbortController()),
(ctrl) => Effect.sync(() => ctrl.abort()),
)
const [language, cfg, item, info] = yield* Effect.all(
[
provider.getLanguage(input.model),
config.get(),
provider.getProvider(input.model.providerID),
auth.get(input.model.providerID),
],
{ concurrency: "unbounded" },
)
const result = yield* run({ ...input, abort: ctrl.signal })
// TODO: move this to a proper hook
const isOpenaiOauth = item.id === "openai" && info?.type === "oauth"
return Stream.fromAsyncIterable(result.fullStream, (e) =>
e instanceof Error ? e : new Error(String(e)),
)
const system: string[] = []
system.push(
[
// use agent prompt otherwise provider prompt
...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
// any custom prompt passed into this call
...input.system,
// any custom prompt from last user message
...(input.user.system ? [input.user.system] : []),
]
.filter((x) => x)
.join("\n"),
)
const header = system[0]
yield* plugin.trigger(
"experimental.chat.system.transform",
{ sessionID: input.sessionID, model: input.model },
{ system },
)
// rejoin to maintain 2-part structure for caching if header unchanged
if (system.length > 2 && system[0] === header) {
const rest = system.slice(1)
system.length = 0
system.push(header, rest.join("\n"))
}
const variant =
!input.small && input.model.variants && input.user.model.variant
? input.model.variants[input.user.model.variant]
: {}
const base = input.small
? ProviderTransform.smallOptions(input.model)
: ProviderTransform.options({
model: input.model,
sessionID: input.sessionID,
providerOptions: item.options,
})
const options: Record<string, any> = pipe(
base,
mergeDeep(input.model.options),
mergeDeep(input.agent.options),
mergeDeep(variant),
)
if (isOpenaiOauth) {
options.instructions = system.join("\n")
}
const isWorkflow = language instanceof GitLabWorkflowLanguageModel
const messages = isOpenaiOauth
? input.messages
: isWorkflow
? input.messages
: [
...system.map(
(x): ModelMessage => ({
role: "system",
content: x,
}),
),
...input.messages,
]
const params = yield* plugin.trigger(
"chat.params",
{
sessionID: input.sessionID,
agent: input.agent.name,
model: input.model,
provider: item,
message: input.user,
},
{
temperature: input.model.capabilities.temperature
? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
: undefined,
topP: input.agent.topP ?? ProviderTransform.topP(input.model),
topK: ProviderTransform.topK(input.model),
maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
options,
},
)
const { headers } = yield* plugin.trigger(
"chat.headers",
{
sessionID: input.sessionID,
agent: input.agent.name,
model: input.model,
provider: item,
message: input.user,
},
{
headers: {},
},
)
const tools = resolveTools(input)
// LiteLLM and some Anthropic proxies require the tools parameter to be present
// when message history contains tool calls, even if no tools are being used.
// Add a dummy tool that is never called to satisfy this validation.
// This is enabled for:
// 1. Providers with "litellm" in their ID or API ID (auto-detected)
// 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
const isLiteLLMProxy =
item.options?.["litellmProxy"] === true ||
input.model.providerID.toLowerCase().includes("litellm") ||
input.model.api.id.toLowerCase().includes("litellm")
// LiteLLM/Bedrock rejects requests where the message history contains tool
// calls but no tools param is present. When there are no active tools (e.g.
// during compaction), inject a stub tool to satisfy the validation requirement.
// The stub description explicitly tells the model not to call it.
if (
(isLiteLLMProxy || input.model.providerID.includes("github-copilot")) &&
Object.keys(tools).length === 0 &&
hasToolCalls(input.messages)
) {
tools["_noop"] = tool({
description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
inputSchema: jsonSchema({
type: "object",
properties: {
reason: { type: "string", description: "Unused" },
},
}),
execute: async () => ({ output: "", title: "", metadata: {} }),
})
}
// Wire up toolExecutor for DWS workflow models so that tool calls
// from the workflow service are executed via opencode's tool system
// and results sent back over the WebSocket.
if (language instanceof GitLabWorkflowLanguageModel) {
const workflowModel = language as GitLabWorkflowLanguageModel & {
sessionID?: string
sessionPreapprovedTools?: string[]
approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
}
workflowModel.sessionID = input.sessionID
workflowModel.systemPrompt = system.join("\n")
workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
const t = tools[toolName]
if (!t || !t.execute) {
return { result: "", error: `Unknown tool: ${toolName}` }
}
try {
const result = await t.execute!(JSON.parse(argsJson), {
toolCallId: _requestID,
messages: input.messages,
abortSignal: input.abort,
})
const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
return {
result: output,
metadata: typeof result === "object" ? result?.metadata : undefined,
title: typeof result === "object" ? result?.title : undefined,
}
} catch (e: any) {
return { result: "", error: e.message ?? String(e) }
}
}
const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
return !match || match.action !== "ask"
})
const bridge = yield* EffectBridge.make()
const approvedToolsForSession = new Set<string>()
workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
// Auto-approve tools that were already approved in this session
// (prevents infinite approval loops for server-side MCP tools)
if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
return { approved: true }
}
const id = PermissionID.ascending()
let reply: Permission.Reply | undefined
let unsub: (() => void) | undefined
try {
unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
if (evt.properties.requestID === id) reply = evt.properties.reply
})
const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
try {
const parsed = JSON.parse(t.args) as Record<string, unknown>
const title = (parsed?.title ?? parsed?.name ?? "") as string
return title ? `${t.name}: ${title}` : t.name
} catch {
return t.name
}
})
const uniquePatterns = [...new Set(toolPatterns)] as string[]
await bridge.promise(
perm.ask({
id,
sessionID: SessionID.make(input.sessionID),
permission: "workflow_tool_approval",
patterns: uniquePatterns,
metadata: { tools: approvalTools },
always: uniquePatterns,
ruleset: [],
}),
)
for (const name of uniqueNames) approvedToolsForSession.add(name)
workflowModel.sessionPreapprovedTools = [...(workflowModel.sessionPreapprovedTools ?? []), ...uniqueNames]
return { approved: true }
} catch {
return { approved: false }
} finally {
unsub?.()
}
})
}
const tracer = cfg.experimental?.openTelemetry
? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer))
: undefined
return streamText({
onError(error) {
l.error("stream error", {
error,
})
},
async experimental_repairToolCall(failed) {
const lower = failed.toolCall.toolName.toLowerCase()
if (lower !== failed.toolCall.toolName && tools[lower]) {
l.info("repairing tool call", {
tool: failed.toolCall.toolName,
repaired: lower,
})
return {
...failed.toolCall,
toolName: lower,
}
}
return {
...failed.toolCall,
input: JSON.stringify({
tool: failed.toolCall.toolName,
error: failed.error.message,
}),
),
)
toolName: "invalid",
}
},
temperature: params.temperature,
topP: params.topP,
topK: params.topK,
providerOptions: ProviderTransform.providerOptions(input.model, params.options),
activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
tools,
toolChoice: input.toolChoice,
maxOutputTokens: params.maxOutputTokens,
abortSignal: input.abort,
headers: {
...(input.model.providerID.startsWith("opencode")
? {
"x-opencode-project": Instance.project.id,
"x-opencode-session": input.sessionID,
"x-opencode-request": input.user.id,
"x-opencode-client": Flag.OPENCODE_CLIENT,
}
: {
"x-session-affinity": input.sessionID,
...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
"User-Agent": `opencode/${Installation.VERSION}`,
}),
...input.model.headers,
...headers,
},
maxRetries: input.retries ?? 0,
messages,
model: wrapLanguageModel({
model: language,
middleware: [
{
specificationVersion: "v3" as const,
async transformParams(args) {
if (args.type === "stream") {
// @ts-expect-error
args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
}
return args.params
},
},
],
}),
experimental_telemetry: {
isEnabled: cfg.experimental?.openTelemetry,
functionId: "session.llm",
tracer,
metadata: {
userId: cfg.username ?? "unknown",
sessionId: input.sessionID,
},
},
})
})
return Service.of({ stream })
}),
)
const stream: Interface["stream"] = (input) =>
Stream.scoped(
Stream.unwrap(
Effect.gen(function* () {
const ctrl = yield* Effect.acquireRelease(
Effect.sync(() => new AbortController()),
(ctrl) => Effect.sync(() => ctrl.abort()),
)
const result = yield* run({ ...input, abort: ctrl.signal })
return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e))))
}),
),
)
return Service.of({ stream })
}),
)
export const layer = live.pipe(Layer.provide(Permission.defaultLayer))
export const defaultLayer = Layer.suspend(() =>
layer.pipe(

View File

@@ -48,6 +48,7 @@ import { EffectLogger } from "@/effect/logger"
import { InstanceState } from "@/effect/instance-state"
import { TaskTool, type TaskPromptOps } from "@/tool/task"
import { SessionRunState } from "./run-state"
import { EffectBridge } from "@/effect/bridge"
// @ts-ignore
globalThis.AI_SDK_LOG_WARNINGS = false
@@ -105,11 +106,7 @@ export namespace SessionPrompt {
const sys = yield* SystemPrompt.Service
const llm = yield* LLM.Service
const runner = Effect.fn("SessionPrompt.runner")(function* () {
const ctx = yield* Effect.context()
return {
promise: <A, E>(effect: Effect.Effect<A, E>) => Effect.runPromiseWith(ctx)(effect),
fork: <A, E>(effect: Effect.Effect<A, E>) => Effect.runForkWith(ctx)(effect),
}
return yield* EffectBridge.make()
})
const ops = Effect.fn("SessionPrompt.ops")(function* () {
const run = yield* runner()

View File

@@ -1,6 +1,7 @@
import { expect, test } from "bun:test"
import { Context, Effect, Layer, Logger } from "effect"
import { AppRuntime } from "../../src/effect/app-runtime"
import { EffectBridge } from "../../src/effect/bridge"
import { InstanceRef } from "../../src/effect/instance-ref"
import { EffectLogger } from "../../src/effect/logger"
import { makeRuntime } from "../../src/effect/run-service"
@@ -59,3 +60,33 @@ test("AppRuntime attaches InstanceRef from ALS", async () => {
expect(dir).toBe(tmp.path)
})
test("EffectBridge preserves logger and instance context across async boundaries", async () => {
await using tmp = await tmpdir({ git: true })
const result = await Instance.provide({
directory: tmp.path,
fn: () =>
AppRuntime.runPromise(
Effect.gen(function* () {
const bridge = yield* EffectBridge.make()
return yield* Effect.promise(() =>
Promise.resolve().then(() =>
bridge.promise(
Effect.gen(function* () {
return {
directory: (yield* InstanceRef)?.directory,
...check(yield* Effect.service(Logger.CurrentLoggers)),
}
}),
),
),
)
}),
),
})
expect(result.directory).toBe(tmp.path)
expect(result.effectLogger).toBe(true)
expect(result.defaultLogger).toBe(false)
})

View File

@@ -17,10 +17,11 @@
"dist"
],
"scripts": {
"typecheck": "tsc --noEmit",
"typecheck": "tsgo --noEmit",
"build": "tsc"
},
"devDependencies": {
"@typescript/native-preview": "catalog:",
"typescript": "catalog:"
},
"dependencies": {