mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-16 10:54:52 +00:00
Compare commits
2 Commits
kit/fs-plu
...
example-tu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
df4362a719 | ||
|
|
f1751401aa |
25
.opencode/plugins/tui-config-once-toast.tsx
Normal file
25
.opencode/plugins/tui-config-once-toast.tsx
Normal file
@@ -0,0 +1,25 @@
|
||||
import type { TuiPluginModule } from "@opencode-ai/plugin/tui"
|
||||
|
||||
let seen = false
|
||||
|
||||
const plugin: TuiPluginModule & { id: string } = {
|
||||
id: "local.config-once-toast",
|
||||
async tui(api) {
|
||||
if (seen) return
|
||||
|
||||
const cfg = api.state.config
|
||||
if (cfg.plugin !== undefined && !Array.isArray(cfg.plugin)) {
|
||||
throw new Error("Invalid config: plugin must be an array")
|
||||
}
|
||||
|
||||
const mdl = typeof cfg.model === "string" && cfg.model.trim() ? cfg.model : "default"
|
||||
seen = true
|
||||
api.ui.toast({
|
||||
title: "Config check",
|
||||
message: `This is a 1 time toast, validating ur config (model: ${mdl})`,
|
||||
variant: "info",
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
export default plugin
|
||||
@@ -1,6 +1,7 @@
|
||||
{
|
||||
"$schema": "https://opencode.ai/tui.json",
|
||||
"plugin": [
|
||||
"./plugins/tui-config-once-toast.tsx",
|
||||
[
|
||||
"./plugins/tui-smoke.tsx",
|
||||
{
|
||||
|
||||
1
bun.lock
1
bun.lock
@@ -510,6 +510,7 @@
|
||||
"effect": "catalog:",
|
||||
},
|
||||
"devDependencies": {
|
||||
"@typescript/native-preview": "catalog:",
|
||||
"typescript": "catalog:",
|
||||
},
|
||||
},
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import z from "zod"
|
||||
import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import { Log } from "../util/log"
|
||||
import { BusEvent } from "./bus-event"
|
||||
import { GlobalBus } from "./global"
|
||||
@@ -128,6 +128,7 @@ export namespace Bus {
|
||||
function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
|
||||
return Effect.gen(function* () {
|
||||
log.info("subscribing", { type })
|
||||
const bridge = yield* EffectBridge.make()
|
||||
const scope = yield* Scope.make()
|
||||
const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
|
||||
|
||||
@@ -147,7 +148,7 @@ export namespace Bus {
|
||||
|
||||
return () => {
|
||||
log.info("unsubscribing", { type })
|
||||
Effect.runFork(Scope.close(scope, Exit.void).pipe(Effect.provide(EffectLogger.layer)))
|
||||
bridge.fork(Scope.close(scope, Exit.void))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import type { InstanceContext } from "@/project/instance"
|
||||
import { SessionID, MessageID } from "@/session/schema"
|
||||
import { Effect, Layer, Context } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import z from "zod"
|
||||
import { Config } from "../config/config"
|
||||
import { MCP } from "../mcp"
|
||||
@@ -82,6 +82,7 @@ export namespace Command {
|
||||
|
||||
const init = Effect.fn("Command.state")(function* (ctx: InstanceContext) {
|
||||
const cfg = yield* config.get()
|
||||
const bridge = yield* EffectBridge.make()
|
||||
const commands: Record<string, Info> = {}
|
||||
|
||||
commands[Default.INIT] = {
|
||||
@@ -125,7 +126,7 @@ export namespace Command {
|
||||
source: "mcp",
|
||||
description: prompt.description,
|
||||
get template() {
|
||||
return Effect.runPromise(
|
||||
return bridge.promise(
|
||||
mcp
|
||||
.getPrompt(
|
||||
prompt.client,
|
||||
@@ -141,7 +142,6 @@ export namespace Command {
|
||||
.map((message) => (message.content.type === "text" ? message.content.text : ""))
|
||||
.join("\n") || "",
|
||||
),
|
||||
Effect.provide(EffectLogger.layer),
|
||||
),
|
||||
)
|
||||
},
|
||||
|
||||
@@ -12,6 +12,10 @@ export const WorkspaceContext = {
|
||||
return context.provide({ workspaceID: input.workspaceID as string }, () => input.fn())
|
||||
},
|
||||
|
||||
restore<R>(workspaceID: string, fn: () => R): R {
|
||||
return context.provide({ workspaceID }, fn)
|
||||
},
|
||||
|
||||
get workspaceID() {
|
||||
try {
|
||||
return context.use().workspaceID
|
||||
|
||||
49
packages/opencode/src/effect/bridge.ts
Normal file
49
packages/opencode/src/effect/bridge.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
import { Effect, Fiber } from "effect"
|
||||
import { WorkspaceContext } from "@/control-plane/workspace-context"
|
||||
import { Instance, type InstanceContext } from "@/project/instance"
|
||||
import { LocalContext } from "@/util/local-context"
|
||||
import { InstanceRef, WorkspaceRef } from "./instance-ref"
|
||||
import { attachWith } from "./run-service"
|
||||
|
||||
export namespace EffectBridge {
|
||||
export interface Shape {
|
||||
readonly promise: <A, E, R>(effect: Effect.Effect<A, E, R>) => Promise<A>
|
||||
readonly fork: <A, E, R>(effect: Effect.Effect<A, E, R>) => Fiber.Fiber<A, E>
|
||||
}
|
||||
|
||||
function restore<R>(instance: InstanceContext | undefined, workspace: string | undefined, fn: () => R): R {
|
||||
if (instance && workspace !== undefined) {
|
||||
return WorkspaceContext.restore(workspace, () => Instance.restore(instance, fn))
|
||||
}
|
||||
if (instance) return Instance.restore(instance, fn)
|
||||
if (workspace !== undefined) return WorkspaceContext.restore(workspace, fn)
|
||||
return fn()
|
||||
}
|
||||
|
||||
export function make(): Effect.Effect<Shape> {
|
||||
return Effect.gen(function* () {
|
||||
const ctx = yield* Effect.context()
|
||||
const value = yield* InstanceRef
|
||||
const instance =
|
||||
value ??
|
||||
(() => {
|
||||
try {
|
||||
return Instance.current
|
||||
} catch (err) {
|
||||
if (!(err instanceof LocalContext.NotFound)) throw err
|
||||
}
|
||||
})()
|
||||
const workspace = (yield* WorkspaceRef) ?? WorkspaceContext.workspaceID
|
||||
const attach = <A, E, R>(effect: Effect.Effect<A, E, R>) => attachWith(effect, { instance, workspace })
|
||||
const wrap = <A, E, R>(effect: Effect.Effect<A, E, R>) =>
|
||||
attach(effect).pipe(Effect.provide(ctx)) as Effect.Effect<A, E, never>
|
||||
|
||||
return {
|
||||
promise: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
|
||||
restore(instance, workspace, () => Effect.runPromise(wrap(effect))),
|
||||
fork: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
|
||||
restore(instance, workspace, () => Effect.runFork(wrap(effect))),
|
||||
} satisfies Shape
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -5,14 +5,31 @@ import { LocalContext } from "@/util/local-context"
|
||||
import { InstanceRef, WorkspaceRef } from "./instance-ref"
|
||||
import { Observability } from "./observability"
|
||||
import { WorkspaceContext } from "@/control-plane/workspace-context"
|
||||
import type { InstanceContext } from "@/project/instance"
|
||||
|
||||
export const memoMap = Layer.makeMemoMapUnsafe()
|
||||
|
||||
type Refs = {
|
||||
instance?: InstanceContext
|
||||
workspace?: string
|
||||
}
|
||||
|
||||
export function attachWith<A, E, R>(effect: Effect.Effect<A, E, R>, refs: Refs): Effect.Effect<A, E, R> {
|
||||
if (!refs.instance && !refs.workspace) return effect
|
||||
if (!refs.instance) return effect.pipe(Effect.provideService(WorkspaceRef, refs.workspace))
|
||||
if (!refs.workspace) return effect.pipe(Effect.provideService(InstanceRef, refs.instance))
|
||||
return effect.pipe(
|
||||
Effect.provideService(InstanceRef, refs.instance),
|
||||
Effect.provideService(WorkspaceRef, refs.workspace),
|
||||
)
|
||||
}
|
||||
|
||||
export function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
|
||||
try {
|
||||
const ctx = Instance.current
|
||||
const workspaceID = WorkspaceContext.workspaceID
|
||||
return effect.pipe(Effect.provideService(InstanceRef, ctx), Effect.provideService(WorkspaceRef, workspaceID))
|
||||
return attachWith(effect, {
|
||||
instance: Instance.current,
|
||||
workspace: WorkspaceContext.workspaceID,
|
||||
})
|
||||
} catch (err) {
|
||||
if (!(err instanceof LocalContext.NotFound)) throw err
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ import { Bus } from "@/bus"
|
||||
import { TuiEvent } from "@/cli/cmd/tui/event"
|
||||
import open from "open"
|
||||
import { Effect, Exit, Layer, Option, Context, Stream } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
||||
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
||||
@@ -471,25 +471,24 @@ export namespace MCP {
|
||||
Effect.catch(() => Effect.succeed([] as number[])),
|
||||
)
|
||||
|
||||
function watch(s: State, name: string, client: MCPClient, timeout?: number) {
|
||||
function watch(s: State, name: string, client: MCPClient, bridge: EffectBridge.Shape, timeout?: number) {
|
||||
client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
|
||||
log.info("tools list changed notification received", { server: name })
|
||||
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
||||
|
||||
const listed = await Effect.runPromise(defs(name, client, timeout).pipe(Effect.provide(EffectLogger.layer)))
|
||||
const listed = await bridge.promise(defs(name, client, timeout))
|
||||
if (!listed) return
|
||||
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
||||
|
||||
s.defs[name] = listed
|
||||
await Effect.runPromise(
|
||||
bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore, Effect.provide(EffectLogger.layer)),
|
||||
)
|
||||
await bridge.promise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore))
|
||||
})
|
||||
}
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("MCP.state")(function* () {
|
||||
const cfg = yield* cfgSvc.get()
|
||||
const bridge = yield* EffectBridge.make()
|
||||
const config = cfg.mcp ?? {}
|
||||
const s: State = {
|
||||
status: {},
|
||||
@@ -518,7 +517,7 @@ export namespace MCP {
|
||||
if (result.mcpClient) {
|
||||
s.clients[key] = result.mcpClient
|
||||
s.defs[key] = result.defs!
|
||||
watch(s, key, result.mcpClient, mcp.timeout)
|
||||
watch(s, key, result.mcpClient, bridge, mcp.timeout)
|
||||
}
|
||||
}),
|
||||
{ concurrency: "unbounded" },
|
||||
@@ -565,11 +564,12 @@ export namespace MCP {
|
||||
listed: MCPToolDef[],
|
||||
timeout?: number,
|
||||
) {
|
||||
const bridge = yield* EffectBridge.make()
|
||||
yield* closeClient(s, name)
|
||||
s.status[name] = { status: "connected" }
|
||||
s.clients[name] = client
|
||||
s.defs[name] = listed
|
||||
watch(s, name, client, timeout)
|
||||
watch(s, name, client, bridge, timeout)
|
||||
return s.status[name]
|
||||
})
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
|
||||
import { PoeAuthPlugin } from "opencode-poe-auth"
|
||||
import { CloudflareAIGatewayAuthPlugin, CloudflareWorkersAuthPlugin } from "./cloudflare"
|
||||
import { Effect, Layer, Context, Stream } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { errorMessage } from "@/util/error"
|
||||
import { PluginLoader } from "./loader"
|
||||
@@ -90,14 +90,6 @@ export namespace Plugin {
|
||||
return result
|
||||
}
|
||||
|
||||
function publishPluginError(bus: Bus.Interface, message: string) {
|
||||
Effect.runFork(
|
||||
bus
|
||||
.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() })
|
||||
.pipe(Effect.provide(EffectLogger.layer)),
|
||||
)
|
||||
}
|
||||
|
||||
async function applyPlugin(load: PluginLoader.Loaded, input: PluginInput, hooks: Hooks[]) {
|
||||
const plugin = readV1Plugin(load.mod, load.spec, "server", "detect")
|
||||
if (plugin) {
|
||||
@@ -120,6 +112,11 @@ export namespace Plugin {
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("Plugin.state")(function* (ctx) {
|
||||
const hooks: Hooks[] = []
|
||||
const bridge = yield* EffectBridge.make()
|
||||
|
||||
function publishPluginError(message: string) {
|
||||
bridge.fork(bus.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() }))
|
||||
}
|
||||
|
||||
const { Server } = yield* Effect.promise(() => import("../server/server"))
|
||||
|
||||
@@ -187,24 +184,24 @@ export namespace Plugin {
|
||||
if (stage === "install") {
|
||||
const parsed = parsePluginSpecifier(spec)
|
||||
log.error("failed to install plugin", { pkg: parsed.pkg, version: parsed.version, error: message })
|
||||
publishPluginError(bus, `Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
|
||||
publishPluginError(`Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
|
||||
return
|
||||
}
|
||||
|
||||
if (stage === "compatibility") {
|
||||
log.warn("plugin incompatible", { path: spec, error: message })
|
||||
publishPluginError(bus, `Plugin ${spec} skipped: ${message}`)
|
||||
publishPluginError(`Plugin ${spec} skipped: ${message}`)
|
||||
return
|
||||
}
|
||||
|
||||
if (stage === "entry") {
|
||||
log.error("failed to resolve plugin server entry", { path: spec, error: message })
|
||||
publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
|
||||
publishPluginError(`Failed to load plugin ${spec}: ${message}`)
|
||||
return
|
||||
}
|
||||
|
||||
log.error("failed to load plugin", { path: spec, target: resolved?.entry, error: message })
|
||||
publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
|
||||
publishPluginError(`Failed to load plugin ${spec}: ${message}`)
|
||||
},
|
||||
},
|
||||
}),
|
||||
|
||||
@@ -19,7 +19,7 @@ import { iife } from "@/util/iife"
|
||||
import { Global } from "../global"
|
||||
import path from "path"
|
||||
import { Effect, Layer, Context } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
|
||||
import { isRecord } from "@/util/record"
|
||||
@@ -1043,6 +1043,7 @@ export namespace Provider {
|
||||
const state = yield* InstanceState.make<State>(() =>
|
||||
Effect.gen(function* () {
|
||||
using _ = log.time("state")
|
||||
const bridge = yield* EffectBridge.make()
|
||||
const cfg = yield* config.get()
|
||||
const modelsDev = yield* Effect.promise(() => ModelsDev.get())
|
||||
const database = mapValues(modelsDev, fromModelsDevProvider)
|
||||
@@ -1223,8 +1224,7 @@ export namespace Provider {
|
||||
|
||||
const options = yield* Effect.promise(() =>
|
||||
plugin.auth!.loader!(
|
||||
() =>
|
||||
Effect.runPromise(auth.get(providerID).pipe(Effect.orDie, Effect.provide(EffectLogger.layer))) as any,
|
||||
() => bridge.promise(auth.get(providerID).pipe(Effect.orDie)) as any,
|
||||
database[plugin.auth!.provider],
|
||||
),
|
||||
)
|
||||
|
||||
@@ -10,7 +10,7 @@ import { Shell } from "@/shell/shell"
|
||||
import { Plugin } from "@/plugin"
|
||||
import { PtyID } from "./schema"
|
||||
import { Effect, Layer, Context } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
|
||||
export namespace Pty {
|
||||
const log = Log.create({ service: "pty" })
|
||||
@@ -173,6 +173,7 @@ export namespace Pty {
|
||||
|
||||
const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
|
||||
const s = yield* InstanceState.get(state)
|
||||
const bridge = yield* EffectBridge.make()
|
||||
const id = PtyID.ascending()
|
||||
const command = input.command || Shell.preferred()
|
||||
const args = input.args || []
|
||||
@@ -256,8 +257,8 @@ export namespace Pty {
|
||||
if (session.info.status === "exited") return
|
||||
log.info("session exited", { id, exitCode })
|
||||
session.info.status = "exited"
|
||||
Effect.runFork(bus.publish(Event.Exited, { id, exitCode }).pipe(Effect.provide(EffectLogger.layer)))
|
||||
Effect.runFork(remove(id).pipe(Effect.provide(EffectLogger.layer)))
|
||||
bridge.fork(bus.publish(Event.Exited, { id, exitCode }))
|
||||
bridge.fork(remove(id))
|
||||
}),
|
||||
)
|
||||
yield* bus.publish(Event.Created, { info })
|
||||
|
||||
@@ -20,13 +20,12 @@ import { Wildcard } from "@/util/wildcard"
|
||||
import { SessionID } from "@/session/schema"
|
||||
import { Auth } from "@/auth"
|
||||
import { Installation } from "@/installation"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import * as Option from "effect/Option"
|
||||
import * as OtelTracer from "@effect/opentelemetry/Tracer"
|
||||
|
||||
export namespace LLM {
|
||||
const log = Log.create({ service: "llm" })
|
||||
const perms = makeRuntime(Permission.Service, Permission.defaultLayer)
|
||||
export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX
|
||||
type Result = Awaited<ReturnType<typeof streamText>>
|
||||
|
||||
@@ -57,369 +56,371 @@ export namespace LLM {
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/LLM") {}
|
||||
|
||||
export const layer: Layer.Layer<Service, never, Auth.Service | Config.Service | Provider.Service | Plugin.Service> =
|
||||
Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const auth = yield* Auth.Service
|
||||
const config = yield* Config.Service
|
||||
const provider = yield* Provider.Service
|
||||
const plugin = yield* Plugin.Service
|
||||
const live: Layer.Layer<
|
||||
Service,
|
||||
never,
|
||||
Auth.Service | Config.Service | Provider.Service | Plugin.Service | Permission.Service
|
||||
> = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const auth = yield* Auth.Service
|
||||
const config = yield* Config.Service
|
||||
const provider = yield* Provider.Service
|
||||
const plugin = yield* Plugin.Service
|
||||
const perm = yield* Permission.Service
|
||||
|
||||
const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
|
||||
const l = log
|
||||
.clone()
|
||||
.tag("providerID", input.model.providerID)
|
||||
.tag("modelID", input.model.id)
|
||||
.tag("sessionID", input.sessionID)
|
||||
.tag("small", (input.small ?? false).toString())
|
||||
.tag("agent", input.agent.name)
|
||||
.tag("mode", input.agent.mode)
|
||||
l.info("stream", {
|
||||
modelID: input.model.id,
|
||||
providerID: input.model.providerID,
|
||||
})
|
||||
|
||||
const [language, cfg, item, info] = yield* Effect.all(
|
||||
[
|
||||
provider.getLanguage(input.model),
|
||||
config.get(),
|
||||
provider.getProvider(input.model.providerID),
|
||||
auth.get(input.model.providerID),
|
||||
],
|
||||
{ concurrency: "unbounded" },
|
||||
)
|
||||
|
||||
// TODO: move this to a proper hook
|
||||
const isOpenaiOauth = item.id === "openai" && info?.type === "oauth"
|
||||
|
||||
const system: string[] = []
|
||||
system.push(
|
||||
[
|
||||
// use agent prompt otherwise provider prompt
|
||||
...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
|
||||
// any custom prompt passed into this call
|
||||
...input.system,
|
||||
// any custom prompt from last user message
|
||||
...(input.user.system ? [input.user.system] : []),
|
||||
]
|
||||
.filter((x) => x)
|
||||
.join("\n"),
|
||||
)
|
||||
|
||||
const header = system[0]
|
||||
yield* plugin.trigger(
|
||||
"experimental.chat.system.transform",
|
||||
{ sessionID: input.sessionID, model: input.model },
|
||||
{ system },
|
||||
)
|
||||
// rejoin to maintain 2-part structure for caching if header unchanged
|
||||
if (system.length > 2 && system[0] === header) {
|
||||
const rest = system.slice(1)
|
||||
system.length = 0
|
||||
system.push(header, rest.join("\n"))
|
||||
}
|
||||
|
||||
const variant =
|
||||
!input.small && input.model.variants && input.user.model.variant
|
||||
? input.model.variants[input.user.model.variant]
|
||||
: {}
|
||||
const base = input.small
|
||||
? ProviderTransform.smallOptions(input.model)
|
||||
: ProviderTransform.options({
|
||||
model: input.model,
|
||||
sessionID: input.sessionID,
|
||||
providerOptions: item.options,
|
||||
})
|
||||
const options: Record<string, any> = pipe(
|
||||
base,
|
||||
mergeDeep(input.model.options),
|
||||
mergeDeep(input.agent.options),
|
||||
mergeDeep(variant),
|
||||
)
|
||||
if (isOpenaiOauth) {
|
||||
options.instructions = system.join("\n")
|
||||
}
|
||||
|
||||
const isWorkflow = language instanceof GitLabWorkflowLanguageModel
|
||||
const messages = isOpenaiOauth
|
||||
? input.messages
|
||||
: isWorkflow
|
||||
? input.messages
|
||||
: [
|
||||
...system.map(
|
||||
(x): ModelMessage => ({
|
||||
role: "system",
|
||||
content: x,
|
||||
}),
|
||||
),
|
||||
...input.messages,
|
||||
]
|
||||
|
||||
const params = yield* plugin.trigger(
|
||||
"chat.params",
|
||||
{
|
||||
sessionID: input.sessionID,
|
||||
agent: input.agent.name,
|
||||
model: input.model,
|
||||
provider: item,
|
||||
message: input.user,
|
||||
},
|
||||
{
|
||||
temperature: input.model.capabilities.temperature
|
||||
? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
|
||||
: undefined,
|
||||
topP: input.agent.topP ?? ProviderTransform.topP(input.model),
|
||||
topK: ProviderTransform.topK(input.model),
|
||||
maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
|
||||
options,
|
||||
},
|
||||
)
|
||||
|
||||
const { headers } = yield* plugin.trigger(
|
||||
"chat.headers",
|
||||
{
|
||||
sessionID: input.sessionID,
|
||||
agent: input.agent.name,
|
||||
model: input.model,
|
||||
provider: item,
|
||||
message: input.user,
|
||||
},
|
||||
{
|
||||
headers: {},
|
||||
},
|
||||
)
|
||||
|
||||
const tools = resolveTools(input)
|
||||
|
||||
// LiteLLM and some Anthropic proxies require the tools parameter to be present
|
||||
// when message history contains tool calls, even if no tools are being used.
|
||||
// Add a dummy tool that is never called to satisfy this validation.
|
||||
// This is enabled for:
|
||||
// 1. Providers with "litellm" in their ID or API ID (auto-detected)
|
||||
// 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
|
||||
const isLiteLLMProxy =
|
||||
item.options?.["litellmProxy"] === true ||
|
||||
input.model.providerID.toLowerCase().includes("litellm") ||
|
||||
input.model.api.id.toLowerCase().includes("litellm")
|
||||
|
||||
// LiteLLM/Bedrock rejects requests where the message history contains tool
|
||||
// calls but no tools param is present. When there are no active tools (e.g.
|
||||
// during compaction), inject a stub tool to satisfy the validation requirement.
|
||||
// The stub description explicitly tells the model not to call it.
|
||||
if (
|
||||
(isLiteLLMProxy || input.model.providerID.includes("github-copilot")) &&
|
||||
Object.keys(tools).length === 0 &&
|
||||
hasToolCalls(input.messages)
|
||||
) {
|
||||
tools["_noop"] = tool({
|
||||
description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
|
||||
inputSchema: jsonSchema({
|
||||
type: "object",
|
||||
properties: {
|
||||
reason: { type: "string", description: "Unused" },
|
||||
},
|
||||
}),
|
||||
execute: async () => ({ output: "", title: "", metadata: {} }),
|
||||
})
|
||||
}
|
||||
|
||||
// Wire up toolExecutor for DWS workflow models so that tool calls
|
||||
// from the workflow service are executed via opencode's tool system
|
||||
// and results sent back over the WebSocket.
|
||||
if (language instanceof GitLabWorkflowLanguageModel) {
|
||||
const workflowModel = language as GitLabWorkflowLanguageModel & {
|
||||
sessionID?: string
|
||||
sessionPreapprovedTools?: string[]
|
||||
approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
|
||||
}
|
||||
workflowModel.sessionID = input.sessionID
|
||||
workflowModel.systemPrompt = system.join("\n")
|
||||
workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
|
||||
const t = tools[toolName]
|
||||
if (!t || !t.execute) {
|
||||
return { result: "", error: `Unknown tool: ${toolName}` }
|
||||
}
|
||||
try {
|
||||
const result = await t.execute!(JSON.parse(argsJson), {
|
||||
toolCallId: _requestID,
|
||||
messages: input.messages,
|
||||
abortSignal: input.abort,
|
||||
})
|
||||
const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
|
||||
return {
|
||||
result: output,
|
||||
metadata: typeof result === "object" ? result?.metadata : undefined,
|
||||
title: typeof result === "object" ? result?.title : undefined,
|
||||
}
|
||||
} catch (e: any) {
|
||||
return { result: "", error: e.message ?? String(e) }
|
||||
}
|
||||
}
|
||||
|
||||
const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
|
||||
workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
|
||||
const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
|
||||
return !match || match.action !== "ask"
|
||||
})
|
||||
|
||||
const approvedToolsForSession = new Set<string>()
|
||||
workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
|
||||
const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
|
||||
// Auto-approve tools that were already approved in this session
|
||||
// (prevents infinite approval loops for server-side MCP tools)
|
||||
if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
|
||||
return { approved: true }
|
||||
}
|
||||
|
||||
const id = PermissionID.ascending()
|
||||
let reply: Permission.Reply | undefined
|
||||
let unsub: (() => void) | undefined
|
||||
try {
|
||||
unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
|
||||
if (evt.properties.requestID === id) reply = evt.properties.reply
|
||||
})
|
||||
const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
|
||||
try {
|
||||
const parsed = JSON.parse(t.args) as Record<string, unknown>
|
||||
const title = (parsed?.title ?? parsed?.name ?? "") as string
|
||||
return title ? `${t.name}: ${title}` : t.name
|
||||
} catch {
|
||||
return t.name
|
||||
}
|
||||
})
|
||||
const uniquePatterns = [...new Set(toolPatterns)] as string[]
|
||||
await perms.runPromise((svc) =>
|
||||
svc.ask({
|
||||
id,
|
||||
sessionID: SessionID.make(input.sessionID),
|
||||
permission: "workflow_tool_approval",
|
||||
patterns: uniquePatterns,
|
||||
metadata: { tools: approvalTools },
|
||||
always: uniquePatterns,
|
||||
ruleset: [],
|
||||
}),
|
||||
)
|
||||
for (const name of uniqueNames) approvedToolsForSession.add(name)
|
||||
workflowModel.sessionPreapprovedTools = [
|
||||
...(workflowModel.sessionPreapprovedTools ?? []),
|
||||
...uniqueNames,
|
||||
]
|
||||
return { approved: true }
|
||||
} catch {
|
||||
return { approved: false }
|
||||
} finally {
|
||||
unsub?.()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
const tracer = cfg.experimental?.openTelemetry
|
||||
? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer))
|
||||
: undefined
|
||||
|
||||
return streamText({
|
||||
onError(error) {
|
||||
l.error("stream error", {
|
||||
error,
|
||||
})
|
||||
},
|
||||
async experimental_repairToolCall(failed) {
|
||||
const lower = failed.toolCall.toolName.toLowerCase()
|
||||
if (lower !== failed.toolCall.toolName && tools[lower]) {
|
||||
l.info("repairing tool call", {
|
||||
tool: failed.toolCall.toolName,
|
||||
repaired: lower,
|
||||
})
|
||||
return {
|
||||
...failed.toolCall,
|
||||
toolName: lower,
|
||||
}
|
||||
}
|
||||
return {
|
||||
...failed.toolCall,
|
||||
input: JSON.stringify({
|
||||
tool: failed.toolCall.toolName,
|
||||
error: failed.error.message,
|
||||
}),
|
||||
toolName: "invalid",
|
||||
}
|
||||
},
|
||||
temperature: params.temperature,
|
||||
topP: params.topP,
|
||||
topK: params.topK,
|
||||
providerOptions: ProviderTransform.providerOptions(input.model, params.options),
|
||||
activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
|
||||
tools,
|
||||
toolChoice: input.toolChoice,
|
||||
maxOutputTokens: params.maxOutputTokens,
|
||||
abortSignal: input.abort,
|
||||
headers: {
|
||||
...(input.model.providerID.startsWith("opencode")
|
||||
? {
|
||||
"x-opencode-project": Instance.project.id,
|
||||
"x-opencode-session": input.sessionID,
|
||||
"x-opencode-request": input.user.id,
|
||||
"x-opencode-client": Flag.OPENCODE_CLIENT,
|
||||
}
|
||||
: {
|
||||
"x-session-affinity": input.sessionID,
|
||||
...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
|
||||
"User-Agent": `opencode/${Installation.VERSION}`,
|
||||
}),
|
||||
...input.model.headers,
|
||||
...headers,
|
||||
},
|
||||
maxRetries: input.retries ?? 0,
|
||||
messages,
|
||||
model: wrapLanguageModel({
|
||||
model: language,
|
||||
middleware: [
|
||||
{
|
||||
specificationVersion: "v3" as const,
|
||||
async transformParams(args) {
|
||||
if (args.type === "stream") {
|
||||
// @ts-expect-error
|
||||
args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
|
||||
}
|
||||
return args.params
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
experimental_telemetry: {
|
||||
isEnabled: cfg.experimental?.openTelemetry,
|
||||
functionId: "session.llm",
|
||||
tracer,
|
||||
metadata: {
|
||||
userId: cfg.username ?? "unknown",
|
||||
sessionId: input.sessionID,
|
||||
},
|
||||
},
|
||||
})
|
||||
const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
|
||||
const l = log
|
||||
.clone()
|
||||
.tag("providerID", input.model.providerID)
|
||||
.tag("modelID", input.model.id)
|
||||
.tag("sessionID", input.sessionID)
|
||||
.tag("small", (input.small ?? false).toString())
|
||||
.tag("agent", input.agent.name)
|
||||
.tag("mode", input.agent.mode)
|
||||
l.info("stream", {
|
||||
modelID: input.model.id,
|
||||
providerID: input.model.providerID,
|
||||
})
|
||||
|
||||
const stream: Interface["stream"] = (input) =>
|
||||
Stream.scoped(
|
||||
Stream.unwrap(
|
||||
Effect.gen(function* () {
|
||||
const ctrl = yield* Effect.acquireRelease(
|
||||
Effect.sync(() => new AbortController()),
|
||||
(ctrl) => Effect.sync(() => ctrl.abort()),
|
||||
)
|
||||
const [language, cfg, item, info] = yield* Effect.all(
|
||||
[
|
||||
provider.getLanguage(input.model),
|
||||
config.get(),
|
||||
provider.getProvider(input.model.providerID),
|
||||
auth.get(input.model.providerID),
|
||||
],
|
||||
{ concurrency: "unbounded" },
|
||||
)
|
||||
|
||||
const result = yield* run({ ...input, abort: ctrl.signal })
|
||||
// TODO: move this to a proper hook
|
||||
const isOpenaiOauth = item.id === "openai" && info?.type === "oauth"
|
||||
|
||||
return Stream.fromAsyncIterable(result.fullStream, (e) =>
|
||||
e instanceof Error ? e : new Error(String(e)),
|
||||
)
|
||||
const system: string[] = []
|
||||
system.push(
|
||||
[
|
||||
// use agent prompt otherwise provider prompt
|
||||
...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
|
||||
// any custom prompt passed into this call
|
||||
...input.system,
|
||||
// any custom prompt from last user message
|
||||
...(input.user.system ? [input.user.system] : []),
|
||||
]
|
||||
.filter((x) => x)
|
||||
.join("\n"),
|
||||
)
|
||||
|
||||
const header = system[0]
|
||||
yield* plugin.trigger(
|
||||
"experimental.chat.system.transform",
|
||||
{ sessionID: input.sessionID, model: input.model },
|
||||
{ system },
|
||||
)
|
||||
// rejoin to maintain 2-part structure for caching if header unchanged
|
||||
if (system.length > 2 && system[0] === header) {
|
||||
const rest = system.slice(1)
|
||||
system.length = 0
|
||||
system.push(header, rest.join("\n"))
|
||||
}
|
||||
|
||||
const variant =
|
||||
!input.small && input.model.variants && input.user.model.variant
|
||||
? input.model.variants[input.user.model.variant]
|
||||
: {}
|
||||
const base = input.small
|
||||
? ProviderTransform.smallOptions(input.model)
|
||||
: ProviderTransform.options({
|
||||
model: input.model,
|
||||
sessionID: input.sessionID,
|
||||
providerOptions: item.options,
|
||||
})
|
||||
const options: Record<string, any> = pipe(
|
||||
base,
|
||||
mergeDeep(input.model.options),
|
||||
mergeDeep(input.agent.options),
|
||||
mergeDeep(variant),
|
||||
)
|
||||
if (isOpenaiOauth) {
|
||||
options.instructions = system.join("\n")
|
||||
}
|
||||
|
||||
const isWorkflow = language instanceof GitLabWorkflowLanguageModel
|
||||
const messages = isOpenaiOauth
|
||||
? input.messages
|
||||
: isWorkflow
|
||||
? input.messages
|
||||
: [
|
||||
...system.map(
|
||||
(x): ModelMessage => ({
|
||||
role: "system",
|
||||
content: x,
|
||||
}),
|
||||
),
|
||||
...input.messages,
|
||||
]
|
||||
|
||||
const params = yield* plugin.trigger(
|
||||
"chat.params",
|
||||
{
|
||||
sessionID: input.sessionID,
|
||||
agent: input.agent.name,
|
||||
model: input.model,
|
||||
provider: item,
|
||||
message: input.user,
|
||||
},
|
||||
{
|
||||
temperature: input.model.capabilities.temperature
|
||||
? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
|
||||
: undefined,
|
||||
topP: input.agent.topP ?? ProviderTransform.topP(input.model),
|
||||
topK: ProviderTransform.topK(input.model),
|
||||
maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
|
||||
options,
|
||||
},
|
||||
)
|
||||
|
||||
const { headers } = yield* plugin.trigger(
|
||||
"chat.headers",
|
||||
{
|
||||
sessionID: input.sessionID,
|
||||
agent: input.agent.name,
|
||||
model: input.model,
|
||||
provider: item,
|
||||
message: input.user,
|
||||
},
|
||||
{
|
||||
headers: {},
|
||||
},
|
||||
)
|
||||
|
||||
const tools = resolveTools(input)
|
||||
|
||||
// LiteLLM and some Anthropic proxies require the tools parameter to be present
|
||||
// when message history contains tool calls, even if no tools are being used.
|
||||
// Add a dummy tool that is never called to satisfy this validation.
|
||||
// This is enabled for:
|
||||
// 1. Providers with "litellm" in their ID or API ID (auto-detected)
|
||||
// 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
|
||||
const isLiteLLMProxy =
|
||||
item.options?.["litellmProxy"] === true ||
|
||||
input.model.providerID.toLowerCase().includes("litellm") ||
|
||||
input.model.api.id.toLowerCase().includes("litellm")
|
||||
|
||||
// LiteLLM/Bedrock rejects requests where the message history contains tool
|
||||
// calls but no tools param is present. When there are no active tools (e.g.
|
||||
// during compaction), inject a stub tool to satisfy the validation requirement.
|
||||
// The stub description explicitly tells the model not to call it.
|
||||
if (
|
||||
(isLiteLLMProxy || input.model.providerID.includes("github-copilot")) &&
|
||||
Object.keys(tools).length === 0 &&
|
||||
hasToolCalls(input.messages)
|
||||
) {
|
||||
tools["_noop"] = tool({
|
||||
description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
|
||||
inputSchema: jsonSchema({
|
||||
type: "object",
|
||||
properties: {
|
||||
reason: { type: "string", description: "Unused" },
|
||||
},
|
||||
}),
|
||||
execute: async () => ({ output: "", title: "", metadata: {} }),
|
||||
})
|
||||
}
|
||||
|
||||
// Wire up toolExecutor for DWS workflow models so that tool calls
|
||||
// from the workflow service are executed via opencode's tool system
|
||||
// and results sent back over the WebSocket.
|
||||
if (language instanceof GitLabWorkflowLanguageModel) {
|
||||
const workflowModel = language as GitLabWorkflowLanguageModel & {
|
||||
sessionID?: string
|
||||
sessionPreapprovedTools?: string[]
|
||||
approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
|
||||
}
|
||||
workflowModel.sessionID = input.sessionID
|
||||
workflowModel.systemPrompt = system.join("\n")
|
||||
workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
|
||||
const t = tools[toolName]
|
||||
if (!t || !t.execute) {
|
||||
return { result: "", error: `Unknown tool: ${toolName}` }
|
||||
}
|
||||
try {
|
||||
const result = await t.execute!(JSON.parse(argsJson), {
|
||||
toolCallId: _requestID,
|
||||
messages: input.messages,
|
||||
abortSignal: input.abort,
|
||||
})
|
||||
const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
|
||||
return {
|
||||
result: output,
|
||||
metadata: typeof result === "object" ? result?.metadata : undefined,
|
||||
title: typeof result === "object" ? result?.title : undefined,
|
||||
}
|
||||
} catch (e: any) {
|
||||
return { result: "", error: e.message ?? String(e) }
|
||||
}
|
||||
}
|
||||
|
||||
const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
|
||||
workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
|
||||
const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
|
||||
return !match || match.action !== "ask"
|
||||
})
|
||||
|
||||
const bridge = yield* EffectBridge.make()
|
||||
const approvedToolsForSession = new Set<string>()
|
||||
workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
|
||||
const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
|
||||
// Auto-approve tools that were already approved in this session
|
||||
// (prevents infinite approval loops for server-side MCP tools)
|
||||
if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
|
||||
return { approved: true }
|
||||
}
|
||||
|
||||
const id = PermissionID.ascending()
|
||||
let reply: Permission.Reply | undefined
|
||||
let unsub: (() => void) | undefined
|
||||
try {
|
||||
unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
|
||||
if (evt.properties.requestID === id) reply = evt.properties.reply
|
||||
})
|
||||
const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
|
||||
try {
|
||||
const parsed = JSON.parse(t.args) as Record<string, unknown>
|
||||
const title = (parsed?.title ?? parsed?.name ?? "") as string
|
||||
return title ? `${t.name}: ${title}` : t.name
|
||||
} catch {
|
||||
return t.name
|
||||
}
|
||||
})
|
||||
const uniquePatterns = [...new Set(toolPatterns)] as string[]
|
||||
await bridge.promise(
|
||||
perm.ask({
|
||||
id,
|
||||
sessionID: SessionID.make(input.sessionID),
|
||||
permission: "workflow_tool_approval",
|
||||
patterns: uniquePatterns,
|
||||
metadata: { tools: approvalTools },
|
||||
always: uniquePatterns,
|
||||
ruleset: [],
|
||||
}),
|
||||
)
|
||||
for (const name of uniqueNames) approvedToolsForSession.add(name)
|
||||
workflowModel.sessionPreapprovedTools = [...(workflowModel.sessionPreapprovedTools ?? []), ...uniqueNames]
|
||||
return { approved: true }
|
||||
} catch {
|
||||
return { approved: false }
|
||||
} finally {
|
||||
unsub?.()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
const tracer = cfg.experimental?.openTelemetry
|
||||
? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer))
|
||||
: undefined
|
||||
|
||||
return streamText({
|
||||
onError(error) {
|
||||
l.error("stream error", {
|
||||
error,
|
||||
})
|
||||
},
|
||||
async experimental_repairToolCall(failed) {
|
||||
const lower = failed.toolCall.toolName.toLowerCase()
|
||||
if (lower !== failed.toolCall.toolName && tools[lower]) {
|
||||
l.info("repairing tool call", {
|
||||
tool: failed.toolCall.toolName,
|
||||
repaired: lower,
|
||||
})
|
||||
return {
|
||||
...failed.toolCall,
|
||||
toolName: lower,
|
||||
}
|
||||
}
|
||||
return {
|
||||
...failed.toolCall,
|
||||
input: JSON.stringify({
|
||||
tool: failed.toolCall.toolName,
|
||||
error: failed.error.message,
|
||||
}),
|
||||
),
|
||||
)
|
||||
toolName: "invalid",
|
||||
}
|
||||
},
|
||||
temperature: params.temperature,
|
||||
topP: params.topP,
|
||||
topK: params.topK,
|
||||
providerOptions: ProviderTransform.providerOptions(input.model, params.options),
|
||||
activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
|
||||
tools,
|
||||
toolChoice: input.toolChoice,
|
||||
maxOutputTokens: params.maxOutputTokens,
|
||||
abortSignal: input.abort,
|
||||
headers: {
|
||||
...(input.model.providerID.startsWith("opencode")
|
||||
? {
|
||||
"x-opencode-project": Instance.project.id,
|
||||
"x-opencode-session": input.sessionID,
|
||||
"x-opencode-request": input.user.id,
|
||||
"x-opencode-client": Flag.OPENCODE_CLIENT,
|
||||
}
|
||||
: {
|
||||
"x-session-affinity": input.sessionID,
|
||||
...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
|
||||
"User-Agent": `opencode/${Installation.VERSION}`,
|
||||
}),
|
||||
...input.model.headers,
|
||||
...headers,
|
||||
},
|
||||
maxRetries: input.retries ?? 0,
|
||||
messages,
|
||||
model: wrapLanguageModel({
|
||||
model: language,
|
||||
middleware: [
|
||||
{
|
||||
specificationVersion: "v3" as const,
|
||||
async transformParams(args) {
|
||||
if (args.type === "stream") {
|
||||
// @ts-expect-error
|
||||
args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
|
||||
}
|
||||
return args.params
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
experimental_telemetry: {
|
||||
isEnabled: cfg.experimental?.openTelemetry,
|
||||
functionId: "session.llm",
|
||||
tracer,
|
||||
metadata: {
|
||||
userId: cfg.username ?? "unknown",
|
||||
sessionId: input.sessionID,
|
||||
},
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
return Service.of({ stream })
|
||||
}),
|
||||
)
|
||||
const stream: Interface["stream"] = (input) =>
|
||||
Stream.scoped(
|
||||
Stream.unwrap(
|
||||
Effect.gen(function* () {
|
||||
const ctrl = yield* Effect.acquireRelease(
|
||||
Effect.sync(() => new AbortController()),
|
||||
(ctrl) => Effect.sync(() => ctrl.abort()),
|
||||
)
|
||||
|
||||
const result = yield* run({ ...input, abort: ctrl.signal })
|
||||
|
||||
return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e))))
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
return Service.of({ stream })
|
||||
}),
|
||||
)
|
||||
|
||||
export const layer = live.pipe(Layer.provide(Permission.defaultLayer))
|
||||
|
||||
export const defaultLayer = Layer.suspend(() =>
|
||||
layer.pipe(
|
||||
|
||||
@@ -48,6 +48,7 @@ import { EffectLogger } from "@/effect/logger"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { TaskTool, type TaskPromptOps } from "@/tool/task"
|
||||
import { SessionRunState } from "./run-state"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
|
||||
// @ts-ignore
|
||||
globalThis.AI_SDK_LOG_WARNINGS = false
|
||||
@@ -105,11 +106,7 @@ export namespace SessionPrompt {
|
||||
const sys = yield* SystemPrompt.Service
|
||||
const llm = yield* LLM.Service
|
||||
const runner = Effect.fn("SessionPrompt.runner")(function* () {
|
||||
const ctx = yield* Effect.context()
|
||||
return {
|
||||
promise: <A, E>(effect: Effect.Effect<A, E>) => Effect.runPromiseWith(ctx)(effect),
|
||||
fork: <A, E>(effect: Effect.Effect<A, E>) => Effect.runForkWith(ctx)(effect),
|
||||
}
|
||||
return yield* EffectBridge.make()
|
||||
})
|
||||
const ops = Effect.fn("SessionPrompt.ops")(function* () {
|
||||
const run = yield* runner()
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { expect, test } from "bun:test"
|
||||
import { Context, Effect, Layer, Logger } from "effect"
|
||||
import { AppRuntime } from "../../src/effect/app-runtime"
|
||||
import { EffectBridge } from "../../src/effect/bridge"
|
||||
import { InstanceRef } from "../../src/effect/instance-ref"
|
||||
import { EffectLogger } from "../../src/effect/logger"
|
||||
import { makeRuntime } from "../../src/effect/run-service"
|
||||
@@ -59,3 +60,33 @@ test("AppRuntime attaches InstanceRef from ALS", async () => {
|
||||
|
||||
expect(dir).toBe(tmp.path)
|
||||
})
|
||||
|
||||
test("EffectBridge preserves logger and instance context across async boundaries", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
|
||||
const result = await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: () =>
|
||||
AppRuntime.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const bridge = yield* EffectBridge.make()
|
||||
return yield* Effect.promise(() =>
|
||||
Promise.resolve().then(() =>
|
||||
bridge.promise(
|
||||
Effect.gen(function* () {
|
||||
return {
|
||||
directory: (yield* InstanceRef)?.directory,
|
||||
...check(yield* Effect.service(Logger.CurrentLoggers)),
|
||||
}
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
}),
|
||||
),
|
||||
})
|
||||
|
||||
expect(result.directory).toBe(tmp.path)
|
||||
expect(result.effectLogger).toBe(true)
|
||||
expect(result.defaultLogger).toBe(false)
|
||||
})
|
||||
|
||||
@@ -17,10 +17,11 @@
|
||||
"dist"
|
||||
],
|
||||
"scripts": {
|
||||
"typecheck": "tsc --noEmit",
|
||||
"typecheck": "tsgo --noEmit",
|
||||
"build": "tsc"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@typescript/native-preview": "catalog:",
|
||||
"typescript": "catalog:"
|
||||
},
|
||||
"dependencies": {
|
||||
|
||||
Reference in New Issue
Block a user