mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-01 03:24:49 +00:00
Compare commits
17 Commits
v1.3.13
...
test/proce
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d2385ad49 | ||
|
|
7f6a5bb2c8 | ||
|
|
537cc32bf0 | ||
|
|
82da702f64 | ||
|
|
90469bbb7e | ||
|
|
4ff0fbc043 | ||
|
|
e24369eaf1 | ||
|
|
825f51c39f | ||
|
|
191a747405 | ||
|
|
cc412f3014 | ||
|
|
bb039496d5 | ||
|
|
f2fa1a681d | ||
|
|
6bd340492c | ||
|
|
21ec3207e7 | ||
|
|
123123b6c3 | ||
|
|
6ea467b0ac | ||
|
|
459fbc99a8 |
@@ -90,8 +90,9 @@ export namespace Bus {
|
||||
if (ps) yield* PubSub.publish(ps, payload)
|
||||
yield* PubSub.publish(state.wildcard, payload)
|
||||
|
||||
const dir = yield* InstanceState.directory
|
||||
GlobalBus.emit("event", {
|
||||
directory: Instance.directory,
|
||||
directory: dir,
|
||||
payload,
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1486,7 +1486,8 @@ export namespace Config {
|
||||
})
|
||||
|
||||
const update = Effect.fn("Config.update")(function* (config: Info) {
|
||||
const file = path.join(Instance.directory, "config.json")
|
||||
const dir = yield* InstanceState.directory
|
||||
const file = path.join(dir, "config.json")
|
||||
const existing = yield* loadFile(file)
|
||||
yield* fs.writeFileString(file, JSON.stringify(mergeDeep(existing, config), null, 2)).pipe(Effect.orDie)
|
||||
yield* Effect.promise(() => Instance.dispose())
|
||||
|
||||
6
packages/opencode/src/effect/instance-ref.ts
Normal file
6
packages/opencode/src/effect/instance-ref.ts
Normal file
@@ -0,0 +1,6 @@
|
||||
import { ServiceMap } from "effect"
|
||||
import type { InstanceContext } from "@/project/instance"
|
||||
|
||||
export const InstanceRef = ServiceMap.Reference<InstanceContext | undefined>("~opencode/InstanceRef", {
|
||||
defaultValue: () => undefined,
|
||||
})
|
||||
@@ -1,5 +1,7 @@
|
||||
import { Effect, ScopedCache, Scope } from "effect"
|
||||
import { Effect, Fiber, ScopedCache, Scope, ServiceMap } from "effect"
|
||||
import { Instance, type InstanceContext } from "@/project/instance"
|
||||
import { Context } from "@/util/context"
|
||||
import { InstanceRef } from "./instance-ref"
|
||||
import { registerDisposer } from "./instance-registry"
|
||||
|
||||
const TypeId = "~opencode/InstanceState"
|
||||
@@ -10,13 +12,34 @@ export interface InstanceState<A, E = never, R = never> {
|
||||
}
|
||||
|
||||
export namespace InstanceState {
|
||||
export const bind = <F extends (...args: any[]) => any>(fn: F): F => {
|
||||
try {
|
||||
return Instance.bind(fn)
|
||||
} catch (err) {
|
||||
if (!(err instanceof Context.NotFound)) throw err
|
||||
}
|
||||
const fiber = Fiber.getCurrent()
|
||||
const ctx = fiber ? ServiceMap.getReferenceUnsafe(fiber.services, InstanceRef) : undefined
|
||||
if (!ctx) return fn
|
||||
return ((...args: any[]) => Instance.restore(ctx, () => fn(...args))) as F
|
||||
}
|
||||
|
||||
export const context = Effect.fnUntraced(function* () {
|
||||
return (yield* InstanceRef) ?? Instance.current
|
||||
})()
|
||||
|
||||
export const directory = Effect.map(context, (ctx) => ctx.directory)
|
||||
|
||||
export const make = <A, E = never, R = never>(
|
||||
init: (ctx: InstanceContext) => Effect.Effect<A, E, R | Scope.Scope>,
|
||||
): Effect.Effect<InstanceState<A, E, Exclude<R, Scope.Scope>>, never, R | Scope.Scope> =>
|
||||
Effect.gen(function* () {
|
||||
const cache = yield* ScopedCache.make<string, A, E, R>({
|
||||
capacity: Number.POSITIVE_INFINITY,
|
||||
lookup: () => init(Instance.current),
|
||||
lookup: () =>
|
||||
Effect.fnUntraced(function* () {
|
||||
return yield* init(yield* context)
|
||||
})(),
|
||||
})
|
||||
|
||||
const off = registerDisposer((directory) => Effect.runPromise(ScopedCache.invalidate(cache, directory)))
|
||||
@@ -29,7 +52,9 @@ export namespace InstanceState {
|
||||
})
|
||||
|
||||
export const get = <A, E, R>(self: InstanceState<A, E, R>) =>
|
||||
Effect.suspend(() => ScopedCache.get(self.cache, Instance.directory))
|
||||
Effect.gen(function* () {
|
||||
return yield* ScopedCache.get(self.cache, yield* directory)
|
||||
})
|
||||
|
||||
export const use = <A, E, R, B>(self: InstanceState<A, E, R>, select: (value: A) => B) =>
|
||||
Effect.map(get(self), select)
|
||||
@@ -40,8 +65,18 @@ export namespace InstanceState {
|
||||
) => Effect.flatMap(get(self), select)
|
||||
|
||||
export const has = <A, E, R>(self: InstanceState<A, E, R>) =>
|
||||
Effect.suspend(() => ScopedCache.has(self.cache, Instance.directory))
|
||||
Effect.gen(function* () {
|
||||
return yield* ScopedCache.has(self.cache, yield* directory)
|
||||
})
|
||||
|
||||
export const invalidate = <A, E, R>(self: InstanceState<A, E, R>) =>
|
||||
Effect.suspend(() => ScopedCache.invalidate(self.cache, Instance.directory))
|
||||
Effect.gen(function* () {
|
||||
return yield* ScopedCache.invalidate(self.cache, yield* directory)
|
||||
})
|
||||
|
||||
/**
|
||||
* Effect finalizers run on the fiber scheduler after the original async
|
||||
* boundary, so ALS reads like Instance.directory can be gone by then.
|
||||
*/
|
||||
export const withALS = <T>(fn: () => T) => Effect.map(context, (ctx) => Instance.restore(ctx, fn))
|
||||
}
|
||||
|
||||
@@ -1,19 +1,33 @@
|
||||
import { Effect, Layer, ManagedRuntime } from "effect"
|
||||
import * as ServiceMap from "effect/ServiceMap"
|
||||
import { Instance } from "@/project/instance"
|
||||
import { Context } from "@/util/context"
|
||||
import { InstanceRef } from "./instance-ref"
|
||||
|
||||
export const memoMap = Layer.makeMemoMapUnsafe()
|
||||
|
||||
function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
|
||||
try {
|
||||
const ctx = Instance.current
|
||||
return Effect.provideService(effect, InstanceRef, ctx)
|
||||
} catch (err) {
|
||||
if (!(err instanceof Context.NotFound)) throw err
|
||||
}
|
||||
return effect
|
||||
}
|
||||
|
||||
export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
|
||||
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
|
||||
const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap }))
|
||||
|
||||
return {
|
||||
runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(service.use(fn)),
|
||||
runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(attach(service.use(fn))),
|
||||
runPromiseExit: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
|
||||
getRuntime().runPromiseExit(service.use(fn), options),
|
||||
getRuntime().runPromiseExit(attach(service.use(fn)), options),
|
||||
runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
|
||||
getRuntime().runPromise(service.use(fn), options),
|
||||
runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)),
|
||||
runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runCallback(service.use(fn)),
|
||||
getRuntime().runPromise(attach(service.use(fn)), options),
|
||||
runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(attach(service.use(fn))),
|
||||
runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) =>
|
||||
getRuntime().runCallback(attach(service.use(fn))),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,10 +108,11 @@ export namespace Format {
|
||||
for (const item of yield* Effect.promise(() => getFormatter(ext))) {
|
||||
log.info("running", { command: item.command })
|
||||
const cmd = item.command.map((x) => x.replace("$FILE", filepath))
|
||||
const dir = yield* InstanceState.directory
|
||||
const code = yield* spawner
|
||||
.spawn(
|
||||
ChildProcess.make(cmd[0]!, cmd.slice(1), {
|
||||
cwd: Instance.directory,
|
||||
cwd: dir,
|
||||
env: item.environment,
|
||||
extendEnv: true,
|
||||
}),
|
||||
|
||||
@@ -9,11 +9,7 @@ import z from "zod"
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { Flag } from "../flag/flag"
|
||||
import { Log } from "../util/log"
|
||||
|
||||
declare global {
|
||||
const OPENCODE_VERSION: string
|
||||
const OPENCODE_CHANNEL: string
|
||||
}
|
||||
import { CHANNEL as channel, VERSION as version } from "./meta"
|
||||
|
||||
import semver from "semver"
|
||||
|
||||
@@ -60,8 +56,8 @@ export namespace Installation {
|
||||
})
|
||||
export type Info = z.infer<typeof Info>
|
||||
|
||||
export const VERSION = typeof OPENCODE_VERSION === "string" ? OPENCODE_VERSION : "local"
|
||||
export const CHANNEL = typeof OPENCODE_CHANNEL === "string" ? OPENCODE_CHANNEL : "local"
|
||||
export const VERSION = version
|
||||
export const CHANNEL = channel
|
||||
export const USER_AGENT = `opencode/${CHANNEL}/${VERSION}/${Flag.OPENCODE_CLIENT}`
|
||||
|
||||
export function isPreview() {
|
||||
|
||||
7
packages/opencode/src/installation/meta.ts
Normal file
7
packages/opencode/src/installation/meta.ts
Normal file
@@ -0,0 +1,7 @@
|
||||
declare global {
|
||||
const OPENCODE_VERSION: string
|
||||
const OPENCODE_CHANNEL: string
|
||||
}
|
||||
|
||||
export const VERSION = typeof OPENCODE_VERSION === "string" ? OPENCODE_VERSION : "local"
|
||||
export const CHANNEL = typeof OPENCODE_CHANNEL === "string" ? OPENCODE_CHANNEL : "local"
|
||||
@@ -114,6 +114,14 @@ export const Instance = {
|
||||
const ctx = context.use()
|
||||
return ((...args: any[]) => context.provide(ctx, () => fn(...args))) as F
|
||||
},
|
||||
/**
|
||||
* Run a synchronous function within the given instance context ALS.
|
||||
* Use this to bridge from Effect (where InstanceRef carries context)
|
||||
* back to sync code that reads Instance.directory from ALS.
|
||||
*/
|
||||
restore<R>(ctx: InstanceContext, fn: () => R): R {
|
||||
return context.provide(ctx, fn)
|
||||
},
|
||||
state<S>(init: () => S, dispose?: (state: Awaited<S>) => Promise<void>): () => S {
|
||||
return State.create(() => Instance.directory, init, dispose)
|
||||
},
|
||||
|
||||
@@ -1541,10 +1541,9 @@ export namespace Provider {
|
||||
}),
|
||||
)
|
||||
|
||||
const { runPromise } = makeRuntime(
|
||||
Service,
|
||||
layer.pipe(Layer.provide(Config.defaultLayer), Layer.provide(Auth.defaultLayer)),
|
||||
)
|
||||
export const defaultLayer = layer.pipe(Layer.provide(Config.defaultLayer), Layer.provide(Auth.defaultLayer))
|
||||
|
||||
const { runPromise } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
export async function list() {
|
||||
return runPromise((svc) => svc.list())
|
||||
|
||||
@@ -17,6 +17,7 @@ import { NotFoundError } from "@/storage/db"
|
||||
import { ModelID, ProviderID } from "@/provider/schema"
|
||||
import { Effect, Layer, ServiceMap } from "effect"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { isOverflow as overflow } from "./overflow"
|
||||
|
||||
export namespace SessionCompaction {
|
||||
@@ -213,6 +214,7 @@ When constructing the summary, try to stick to this template:
|
||||
const msgs = structuredClone(messages)
|
||||
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
|
||||
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
|
||||
const ctx = yield* InstanceState.context
|
||||
const msg: MessageV2.Assistant = {
|
||||
id: MessageID.ascending(),
|
||||
role: "assistant",
|
||||
@@ -223,8 +225,8 @@ When constructing the summary, try to stick to this template:
|
||||
variant: userMessage.variant,
|
||||
summary: true,
|
||||
path: {
|
||||
cwd: Instance.directory,
|
||||
root: Instance.worktree,
|
||||
cwd: ctx.directory,
|
||||
root: ctx.worktree,
|
||||
},
|
||||
cost: 0,
|
||||
tokens: {
|
||||
|
||||
@@ -19,6 +19,7 @@ import { Log } from "../util/log"
|
||||
import { updateSchema } from "../util/update-schema"
|
||||
import { MessageV2 } from "./message-v2"
|
||||
import { Instance } from "../project/instance"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { SessionPrompt } from "./prompt"
|
||||
import { fn } from "@/util/fn"
|
||||
import { Command } from "../command"
|
||||
@@ -382,11 +383,12 @@ export namespace Session {
|
||||
directory: string
|
||||
permission?: Permission.Ruleset
|
||||
}) {
|
||||
const ctx = yield* InstanceState.context
|
||||
const result: Info = {
|
||||
id: SessionID.descending(input.id),
|
||||
slug: Slug.create(),
|
||||
version: Installation.VERSION,
|
||||
projectID: Instance.project.id,
|
||||
projectID: ctx.project.id,
|
||||
directory: input.directory,
|
||||
workspaceID: input.workspaceID,
|
||||
parentID: input.parentID,
|
||||
@@ -444,12 +446,12 @@ export namespace Session {
|
||||
})
|
||||
|
||||
const children = Effect.fn("Session.children")(function* (parentID: SessionID) {
|
||||
const project = Instance.project
|
||||
const ctx = yield* InstanceState.context
|
||||
const rows = yield* db((d) =>
|
||||
d
|
||||
.select()
|
||||
.from(SessionTable)
|
||||
.where(and(eq(SessionTable.project_id, project.id), eq(SessionTable.parent_id, parentID)))
|
||||
.where(and(eq(SessionTable.project_id, ctx.project.id), eq(SessionTable.parent_id, parentID)))
|
||||
.all(),
|
||||
)
|
||||
return rows.map(fromRow)
|
||||
@@ -496,9 +498,10 @@ export namespace Session {
|
||||
permission?: Permission.Ruleset
|
||||
workspaceID?: WorkspaceID
|
||||
}) {
|
||||
const directory = yield* InstanceState.directory
|
||||
return yield* createNext({
|
||||
parentID: input?.parentID,
|
||||
directory: Instance.directory,
|
||||
directory,
|
||||
title: input?.title,
|
||||
permission: input?.permission,
|
||||
workspaceID: input?.workspaceID,
|
||||
@@ -506,10 +509,11 @@ export namespace Session {
|
||||
})
|
||||
|
||||
const fork = Effect.fn("Session.fork")(function* (input: { sessionID: SessionID; messageID?: MessageID }) {
|
||||
const directory = yield* InstanceState.directory
|
||||
const original = yield* get(input.sessionID)
|
||||
const title = getForkedTitle(original.title)
|
||||
const session = yield* createNext({
|
||||
directory: Instance.directory,
|
||||
directory,
|
||||
workspaceID: original.workspaceID,
|
||||
title,
|
||||
})
|
||||
|
||||
@@ -148,6 +148,7 @@ export namespace SessionPrompt {
|
||||
})
|
||||
|
||||
const resolvePromptParts = Effect.fn("SessionPrompt.resolvePromptParts")(function* (template: string) {
|
||||
const ctx = yield* InstanceState.context
|
||||
const parts: PromptInput["parts"] = [{ type: "text", text: template }]
|
||||
const files = ConfigMarkdown.files(template)
|
||||
const seen = new Set<string>()
|
||||
@@ -159,7 +160,7 @@ export namespace SessionPrompt {
|
||||
seen.add(name)
|
||||
const filepath = name.startsWith("~/")
|
||||
? path.join(os.homedir(), name.slice(2))
|
||||
: path.resolve(Instance.worktree, name)
|
||||
: path.resolve(ctx.worktree, name)
|
||||
|
||||
const info = yield* fsys.stat(filepath).pipe(Effect.option)
|
||||
if (Option.isNone(info)) {
|
||||
@@ -553,6 +554,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
msgs: MessageV2.WithParts[]
|
||||
}) {
|
||||
const { task, model, lastUser, sessionID, session, msgs } = input
|
||||
const ctx = yield* InstanceState.context
|
||||
const taskTool = yield* Effect.promise(() => TaskTool.init())
|
||||
const taskModel = task.model ? yield* getModel(task.model.providerID, task.model.modelID, sessionID) : model
|
||||
const assistantMessage: MessageV2.Assistant = yield* sessions.updateMessage({
|
||||
@@ -563,7 +565,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
mode: task.agent,
|
||||
agent: task.agent,
|
||||
variant: lastUser.variant,
|
||||
path: { cwd: Instance.directory, root: Instance.worktree },
|
||||
path: { cwd: ctx.directory, root: ctx.worktree },
|
||||
cost: 0,
|
||||
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
|
||||
modelID: taskModel.id,
|
||||
@@ -734,6 +736,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
})
|
||||
|
||||
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, signal: AbortSignal) {
|
||||
const ctx = yield* InstanceState.context
|
||||
const session = yield* sessions.get(input.sessionID)
|
||||
if (session.revert) {
|
||||
yield* Effect.promise(() => SessionRevert.cleanup(session))
|
||||
@@ -773,7 +776,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
mode: input.agent,
|
||||
agent: input.agent,
|
||||
cost: 0,
|
||||
path: { cwd: Instance.directory, root: Instance.worktree },
|
||||
path: { cwd: ctx.directory, root: ctx.worktree },
|
||||
time: { created: Date.now() },
|
||||
role: "assistant",
|
||||
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
|
||||
@@ -832,7 +835,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
}
|
||||
|
||||
const args = (invocations[shellName] ?? invocations[""]).args
|
||||
const cwd = Instance.directory
|
||||
const cwd = ctx.directory
|
||||
const shellEnv = yield* plugin.trigger(
|
||||
"shell.env",
|
||||
{ cwd, sessionID: input.sessionID, callID: part.callID },
|
||||
@@ -976,7 +979,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
variant,
|
||||
}
|
||||
|
||||
yield* Effect.addFinalizer(() => Effect.sync(() => InstructionPrompt.clear(info.id)))
|
||||
yield* Effect.addFinalizer(() => InstanceState.withALS(() => InstructionPrompt.clear(info.id)))
|
||||
|
||||
type Draft<T> = T extends MessageV2.Part ? Omit<T, "id"> & { id?: string } : never
|
||||
const assign = (part: Draft<MessageV2.Part>): MessageV2.Part => ({
|
||||
@@ -1330,6 +1333,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
|
||||
const runLoop: (sessionID: SessionID) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.run")(
|
||||
function* (sessionID: SessionID) {
|
||||
const ctx = yield* InstanceState.context
|
||||
let structured: unknown | undefined
|
||||
let step = 0
|
||||
const session = yield* sessions.get(sessionID)
|
||||
@@ -1421,7 +1425,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
mode: agent.name,
|
||||
agent: agent.name,
|
||||
variant: lastUser.variant,
|
||||
path: { cwd: Instance.directory, root: Instance.worktree },
|
||||
path: { cwd: ctx.directory, root: ctx.worktree },
|
||||
cost: 0,
|
||||
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
|
||||
modelID: model.id,
|
||||
@@ -1538,7 +1542,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
}),
|
||||
Effect.fnUntraced(function* (exit) {
|
||||
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) yield* handle.abort()
|
||||
InstructionPrompt.clear(handle.message.id)
|
||||
yield* InstanceState.withALS(() => InstructionPrompt.clear(handle.message.id))
|
||||
}),
|
||||
)
|
||||
if (outcome === "break") break
|
||||
|
||||
@@ -10,8 +10,9 @@ import { NamedError } from "@opencode-ai/util/error"
|
||||
import z from "zod"
|
||||
import path from "path"
|
||||
import { readFileSync, readdirSync, existsSync } from "fs"
|
||||
import { Installation } from "../installation"
|
||||
import { Flag } from "../flag/flag"
|
||||
import { CHANNEL } from "../installation/meta"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { iife } from "@/util/iife"
|
||||
import { init } from "#db"
|
||||
|
||||
@@ -28,10 +29,9 @@ const log = Log.create({ service: "db" })
|
||||
|
||||
export namespace Database {
|
||||
export function getChannelPath() {
|
||||
const channel = Installation.CHANNEL
|
||||
if (["latest", "beta"].includes(channel) || Flag.OPENCODE_DISABLE_CHANNEL_DB)
|
||||
if (["latest", "beta"].includes(CHANNEL) || Flag.OPENCODE_DISABLE_CHANNEL_DB)
|
||||
return path.join(Global.Path.data, "opencode.db")
|
||||
const safe = channel.replace(/[^a-zA-Z0-9._-]/g, "-")
|
||||
const safe = CHANNEL.replace(/[^a-zA-Z0-9._-]/g, "-")
|
||||
return path.join(Global.Path.data, `opencode-${safe}.db`)
|
||||
}
|
||||
|
||||
@@ -142,10 +142,11 @@ export namespace Database {
|
||||
}
|
||||
|
||||
export function effect(fn: () => any | Promise<any>) {
|
||||
const bound = InstanceState.bind(fn)
|
||||
try {
|
||||
ctx.use().effects.push(fn)
|
||||
ctx.use().effects.push(bound)
|
||||
} catch {
|
||||
fn()
|
||||
bound()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,12 +163,8 @@ export namespace Database {
|
||||
} catch (err) {
|
||||
if (err instanceof Context.NotFound) {
|
||||
const effects: (() => void | Promise<void>)[] = []
|
||||
const result = Client().transaction(
|
||||
(tx: TxOrDb) => {
|
||||
return ctx.provide({ tx, effects }, () => callback(tx))
|
||||
},
|
||||
{ behavior: options?.behavior },
|
||||
)
|
||||
const txCallback = InstanceState.bind((tx: TxOrDb) => ctx.provide({ tx, effects }, () => callback(tx)))
|
||||
const result = Client().transaction(txCallback, { behavior: options?.behavior })
|
||||
for (const effect of effects) effect()
|
||||
return result as NotPromise<T>
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import { NodePath } from "@effect/platform-node"
|
||||
import { AppFileSystem } from "@/filesystem"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
|
||||
export namespace Worktree {
|
||||
const log = Log.create({ service: "worktree" })
|
||||
@@ -199,6 +200,7 @@ export namespace Worktree {
|
||||
|
||||
const MAX_NAME_ATTEMPTS = 26
|
||||
const candidate = Effect.fn("Worktree.candidate")(function* (root: string, base?: string) {
|
||||
const ctx = yield* InstanceState.context
|
||||
for (const attempt of Array.from({ length: MAX_NAME_ATTEMPTS }, (_, i) => i)) {
|
||||
const name = base ? (attempt === 0 ? base : `${base}-${Slug.create()}`) : Slug.create()
|
||||
const branch = `opencode/${name}`
|
||||
@@ -207,7 +209,7 @@ export namespace Worktree {
|
||||
if (yield* fs.exists(directory).pipe(Effect.orDie)) continue
|
||||
|
||||
const ref = `refs/heads/${branch}`
|
||||
const branchCheck = yield* git(["show-ref", "--verify", "--quiet", ref], { cwd: Instance.worktree })
|
||||
const branchCheck = yield* git(["show-ref", "--verify", "--quiet", ref], { cwd: ctx.worktree })
|
||||
if (branchCheck.code === 0) continue
|
||||
|
||||
return Info.parse({ name, branch, directory })
|
||||
@@ -216,11 +218,12 @@ export namespace Worktree {
|
||||
})
|
||||
|
||||
const makeWorktreeInfo = Effect.fn("Worktree.makeWorktreeInfo")(function* (name?: string) {
|
||||
if (Instance.project.vcs !== "git") {
|
||||
const ctx = yield* InstanceState.context
|
||||
if (ctx.project.vcs !== "git") {
|
||||
throw new NotGitError({ message: "Worktrees are only supported for git projects" })
|
||||
}
|
||||
|
||||
const root = pathSvc.join(Global.Path.data, "worktree", Instance.project.id)
|
||||
const root = pathSvc.join(Global.Path.data, "worktree", ctx.project.id)
|
||||
yield* fs.makeDirectory(root, { recursive: true }).pipe(Effect.orDie)
|
||||
|
||||
const base = name ? slugify(name) : ""
|
||||
@@ -228,18 +231,20 @@ export namespace Worktree {
|
||||
})
|
||||
|
||||
const setup = Effect.fnUntraced(function* (info: Info) {
|
||||
const ctx = yield* InstanceState.context
|
||||
const created = yield* git(["worktree", "add", "--no-checkout", "-b", info.branch, info.directory], {
|
||||
cwd: Instance.worktree,
|
||||
cwd: ctx.worktree,
|
||||
})
|
||||
if (created.code !== 0) {
|
||||
throw new CreateFailedError({ message: created.stderr || created.text || "Failed to create git worktree" })
|
||||
}
|
||||
|
||||
yield* project.addSandbox(Instance.project.id, info.directory).pipe(Effect.catch(() => Effect.void))
|
||||
yield* project.addSandbox(ctx.project.id, info.directory).pipe(Effect.catch(() => Effect.void))
|
||||
})
|
||||
|
||||
const boot = Effect.fnUntraced(function* (info: Info, startCommand?: string) {
|
||||
const projectID = Instance.project.id
|
||||
const ctx = yield* InstanceState.context
|
||||
const projectID = ctx.project.id
|
||||
const extra = startCommand?.trim()
|
||||
|
||||
const populated = yield* git(["reset", "--hard"], { cwd: info.directory })
|
||||
|
||||
@@ -16,21 +16,21 @@ const truncate = Layer.effectDiscard(
|
||||
|
||||
const it = testEffect(Layer.merge(AccountRepo.layer, truncate))
|
||||
|
||||
it.effect("list returns empty when no accounts exist", () =>
|
||||
it.live("list returns empty when no accounts exist", () =>
|
||||
Effect.gen(function* () {
|
||||
const accounts = yield* AccountRepo.use((r) => r.list())
|
||||
expect(accounts).toEqual([])
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("active returns none when no accounts exist", () =>
|
||||
it.live("active returns none when no accounts exist", () =>
|
||||
Effect.gen(function* () {
|
||||
const active = yield* AccountRepo.use((r) => r.active())
|
||||
expect(Option.isNone(active)).toBe(true)
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("persistAccount inserts and getRow retrieves", () =>
|
||||
it.live("persistAccount inserts and getRow retrieves", () =>
|
||||
Effect.gen(function* () {
|
||||
const id = AccountID.make("user-1")
|
||||
yield* AccountRepo.use((r) =>
|
||||
@@ -56,7 +56,7 @@ it.effect("persistAccount inserts and getRow retrieves", () =>
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("persistAccount sets the active account and org", () =>
|
||||
it.live("persistAccount sets the active account and org", () =>
|
||||
Effect.gen(function* () {
|
||||
const id1 = AccountID.make("user-1")
|
||||
const id2 = AccountID.make("user-2")
|
||||
@@ -93,7 +93,7 @@ it.effect("persistAccount sets the active account and org", () =>
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("list returns all accounts", () =>
|
||||
it.live("list returns all accounts", () =>
|
||||
Effect.gen(function* () {
|
||||
const id1 = AccountID.make("user-1")
|
||||
const id2 = AccountID.make("user-2")
|
||||
@@ -128,7 +128,7 @@ it.effect("list returns all accounts", () =>
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("remove deletes an account", () =>
|
||||
it.live("remove deletes an account", () =>
|
||||
Effect.gen(function* () {
|
||||
const id = AccountID.make("user-1")
|
||||
|
||||
@@ -151,7 +151,7 @@ it.effect("remove deletes an account", () =>
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("use stores the selected org and marks the account active", () =>
|
||||
it.live("use stores the selected org and marks the account active", () =>
|
||||
Effect.gen(function* () {
|
||||
const id1 = AccountID.make("user-1")
|
||||
const id2 = AccountID.make("user-2")
|
||||
@@ -191,7 +191,7 @@ it.effect("use stores the selected org and marks the account active", () =>
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("persistToken updates token fields", () =>
|
||||
it.live("persistToken updates token fields", () =>
|
||||
Effect.gen(function* () {
|
||||
const id = AccountID.make("user-1")
|
||||
|
||||
@@ -225,7 +225,7 @@ it.effect("persistToken updates token fields", () =>
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("persistToken with no expiry sets token_expiry to null", () =>
|
||||
it.live("persistToken with no expiry sets token_expiry to null", () =>
|
||||
Effect.gen(function* () {
|
||||
const id = AccountID.make("user-1")
|
||||
|
||||
@@ -255,7 +255,7 @@ it.effect("persistToken with no expiry sets token_expiry to null", () =>
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("persistAccount upserts on conflict", () =>
|
||||
it.live("persistAccount upserts on conflict", () =>
|
||||
Effect.gen(function* () {
|
||||
const id = AccountID.make("user-1")
|
||||
|
||||
@@ -295,7 +295,7 @@ it.effect("persistAccount upserts on conflict", () =>
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("remove clears active state when deleting the active account", () =>
|
||||
it.live("remove clears active state when deleting the active account", () =>
|
||||
Effect.gen(function* () {
|
||||
const id = AccountID.make("user-1")
|
||||
|
||||
@@ -318,7 +318,7 @@ it.effect("remove clears active state when deleting the active account", () =>
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("getRow returns none for nonexistent account", () =>
|
||||
it.live("getRow returns none for nonexistent account", () =>
|
||||
Effect.gen(function* () {
|
||||
const row = yield* AccountRepo.use((r) => r.getRow(AccountID.make("nope")))
|
||||
expect(Option.isNone(row)).toBe(true)
|
||||
|
||||
@@ -54,7 +54,7 @@ const deviceTokenClient = (body: unknown, status = 400) =>
|
||||
const poll = (body: unknown, status = 400) =>
|
||||
Account.Service.use((s) => s.poll(login())).pipe(Effect.provide(live(deviceTokenClient(body, status))))
|
||||
|
||||
it.effect("orgsByAccount groups orgs per account", () =>
|
||||
it.live("orgsByAccount groups orgs per account", () =>
|
||||
Effect.gen(function* () {
|
||||
yield* AccountRepo.use((r) =>
|
||||
r.persistAccount({
|
||||
@@ -107,7 +107,7 @@ it.effect("orgsByAccount groups orgs per account", () =>
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("token refresh persists the new token", () =>
|
||||
it.live("token refresh persists the new token", () =>
|
||||
Effect.gen(function* () {
|
||||
const id = AccountID.make("user-1")
|
||||
|
||||
@@ -148,7 +148,7 @@ it.effect("token refresh persists the new token", () =>
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("config sends the selected org header", () =>
|
||||
it.live("config sends the selected org header", () =>
|
||||
Effect.gen(function* () {
|
||||
const id = AccountID.make("user-1")
|
||||
|
||||
@@ -188,7 +188,7 @@ it.effect("config sends the selected org header", () =>
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("poll stores the account and first org on success", () =>
|
||||
it.live("poll stores the account and first org on success", () =>
|
||||
Effect.gen(function* () {
|
||||
const client = HttpClient.make((req) =>
|
||||
Effect.succeed(
|
||||
@@ -259,7 +259,7 @@ for (const [name, body, expectedTag] of [
|
||||
"PollExpired",
|
||||
],
|
||||
] as const) {
|
||||
it.effect(`poll returns ${name} for ${body.error}`, () =>
|
||||
it.live(`poll returns ${name} for ${body.error}`, () =>
|
||||
Effect.gen(function* () {
|
||||
const result = yield* poll(body)
|
||||
expect(result._tag).toBe(expectedTag)
|
||||
@@ -267,7 +267,7 @@ for (const [name, body, expectedTag] of [
|
||||
)
|
||||
}
|
||||
|
||||
it.effect("poll returns poll error for other OAuth errors", () =>
|
||||
it.live("poll returns poll error for other OAuth errors", () =>
|
||||
Effect.gen(function* () {
|
||||
const result = yield* poll({
|
||||
error: "server_error",
|
||||
|
||||
@@ -22,7 +22,7 @@ const live = Layer.mergeAll(Bus.layer, node)
|
||||
const it = testEffect(live)
|
||||
|
||||
describe("Bus (Effect-native)", () => {
|
||||
it.effect("publish + subscribe stream delivers events", () =>
|
||||
it.live("publish + subscribe stream delivers events", () =>
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
@@ -46,7 +46,7 @@ describe("Bus (Effect-native)", () => {
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("subscribe filters by event type", () =>
|
||||
it.live("subscribe filters by event type", () =>
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
@@ -70,7 +70,7 @@ describe("Bus (Effect-native)", () => {
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("subscribeAll receives all types", () =>
|
||||
it.live("subscribeAll receives all types", () =>
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
@@ -95,7 +95,7 @@ describe("Bus (Effect-native)", () => {
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("multiple subscribers each receive the event", () =>
|
||||
it.live("multiple subscribers each receive the event", () =>
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
@@ -129,7 +129,7 @@ describe("Bus (Effect-native)", () => {
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("subscribeAll stream sees InstanceDisposed on disposal", () =>
|
||||
it.live("subscribeAll stream sees InstanceDisposed on disposal", () =>
|
||||
Effect.gen(function* () {
|
||||
const dir = yield* tmpdirScoped()
|
||||
const types: string[] = []
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { afterEach, expect, test } from "bun:test"
|
||||
import { Duration, Effect, Layer, ManagedRuntime, ServiceMap } from "effect"
|
||||
import { Cause, Deferred, Duration, Effect, Exit, Fiber, Layer, ManagedRuntime, ServiceMap } from "effect"
|
||||
import { InstanceState } from "../../src/effect/instance-state"
|
||||
import { InstanceRef } from "../../src/effect/instance-ref"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
|
||||
@@ -382,3 +383,100 @@ test("InstanceState dedupes concurrent lookups", async () => {
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("InstanceState survives deferred resume from the same instance context", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
|
||||
interface Api {
|
||||
readonly get: (gate: Deferred.Deferred<void>) => Effect.Effect<string>
|
||||
}
|
||||
|
||||
class Test extends ServiceMap.Service<Test, Api>()("@test/DeferredResume") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
|
||||
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred<void>) {
|
||||
yield* Deferred.await(gate)
|
||||
return yield* InstanceState.get(state)
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
const rt = ManagedRuntime.make(Test.layer)
|
||||
|
||||
try {
|
||||
const gate = await Effect.runPromise(Deferred.make<void>())
|
||||
const fiber = await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: () => Promise.resolve(rt.runFork(Test.use((svc) => svc.get(gate)))),
|
||||
})
|
||||
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: () => Effect.runPromise(Deferred.succeed(gate, void 0)),
|
||||
})
|
||||
const exit = await Effect.runPromise(Fiber.await(fiber))
|
||||
|
||||
expect(Exit.isSuccess(exit)).toBe(true)
|
||||
if (Exit.isSuccess(exit)) {
|
||||
expect(exit.value).toBe(tmp.path)
|
||||
}
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
})
|
||||
|
||||
test("InstanceState survives deferred resume outside ALS when InstanceRef is set", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
|
||||
interface Api {
|
||||
readonly get: (gate: Deferred.Deferred<void>) => Effect.Effect<string>
|
||||
}
|
||||
|
||||
class Test extends ServiceMap.Service<Test, Api>()("@test/DeferredResumeOutside") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
|
||||
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred<void>) {
|
||||
yield* Deferred.await(gate)
|
||||
return yield* InstanceState.get(state)
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
const rt = ManagedRuntime.make(Test.layer)
|
||||
|
||||
try {
|
||||
const gate = await Effect.runPromise(Deferred.make<void>())
|
||||
// Provide InstanceRef so the fiber carries the context even when
|
||||
// the deferred is resolved from outside Instance.provide ALS.
|
||||
const fiber = await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: () =>
|
||||
Promise.resolve(
|
||||
rt.runFork(Test.use((svc) => svc.get(gate)).pipe(Effect.provideService(InstanceRef, Instance.current))),
|
||||
),
|
||||
})
|
||||
|
||||
// Resume from outside any Instance.provide — ALS is NOT set here
|
||||
await Effect.runPromise(Deferred.succeed(gate, void 0))
|
||||
const exit = await Effect.runPromise(Fiber.await(fiber))
|
||||
|
||||
expect(Exit.isSuccess(exit)).toBe(true)
|
||||
if (Exit.isSuccess(exit)) {
|
||||
expect(exit.value).toBe(tmp.path)
|
||||
}
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -6,7 +6,7 @@ import { it } from "../lib/effect"
|
||||
describe("Runner", () => {
|
||||
// --- ensureRunning semantics ---
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"ensureRunning starts work and returns result",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -18,7 +18,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"ensureRunning propagates work failures",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -29,7 +29,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"concurrent callers share the same run",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -51,7 +51,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"concurrent callers all receive same error",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -71,7 +71,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"ensureRunning can be called again after previous run completes",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -81,7 +81,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"second ensureRunning ignores new work if already running",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -110,7 +110,7 @@ describe("Runner", () => {
|
||||
|
||||
// --- cancel semantics ---
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"cancel interrupts running work",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -128,7 +128,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"cancel on idle is a no-op",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -138,7 +138,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"cancel with onInterrupt resolves callers gracefully",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -154,7 +154,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"cancel with queued callers resolves all",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -175,7 +175,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"work can be started after cancel",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -245,7 +245,7 @@ describe("Runner", () => {
|
||||
|
||||
// --- shell semantics ---
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"shell runs exclusively",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -256,7 +256,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"shell rejects when run is active",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -272,7 +272,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"shell rejects when another shell is running",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -292,7 +292,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"shell rejects via busy callback and cancel still stops the first shell",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -323,7 +323,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"cancel interrupts shell that ignores abort signal",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -349,7 +349,7 @@ describe("Runner", () => {
|
||||
|
||||
// --- shell→run handoff ---
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"ensureRunning queues behind shell then runs after",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -376,7 +376,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"multiple ensureRunning callers share the queued run behind shell",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -407,7 +407,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"cancel during shell_then_run cancels both",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -441,7 +441,7 @@ describe("Runner", () => {
|
||||
|
||||
// --- lifecycle callbacks ---
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"onIdle fires when returning to idle from running",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -454,7 +454,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"onIdle fires on cancel",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -470,7 +470,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"onBusy fires when shell starts",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -485,7 +485,7 @@ describe("Runner", () => {
|
||||
|
||||
// --- busy flag ---
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"busy is true during run",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
@@ -502,7 +502,7 @@ describe("Runner", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect(
|
||||
it.live(
|
||||
"busy is true during shell",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
|
||||
@@ -2,10 +2,14 @@ import { $ } from "bun"
|
||||
import * as fs from "fs/promises"
|
||||
import os from "os"
|
||||
import path from "path"
|
||||
import { Effect, FileSystem, ServiceMap } from "effect"
|
||||
import { Effect, ServiceMap } from "effect"
|
||||
import type * as PlatformError from "effect/PlatformError"
|
||||
import type * as Scope from "effect/Scope"
|
||||
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
||||
import type { Config } from "../../src/config/config"
|
||||
import { InstanceRef } from "../../src/effect/instance-ref"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { TestLLMServer } from "../lib/llm-server"
|
||||
|
||||
// Strip null bytes from paths (defensive fix for CI environment issues)
|
||||
function sanitizePath(p: string): string {
|
||||
@@ -78,9 +82,17 @@ export async function tmpdir<T>(options?: TmpDirOptions<T>) {
|
||||
/** Effectful scoped tmpdir. Cleaned up when the scope closes. Make sure these stay in sync */
|
||||
export function tmpdirScoped(options?: { git?: boolean; config?: Partial<Config.Info> }) {
|
||||
return Effect.gen(function* () {
|
||||
const fs = yield* FileSystem.FileSystem
|
||||
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
|
||||
const dir = yield* fs.makeTempDirectoryScoped({ prefix: "opencode-test-" })
|
||||
const dirpath = sanitizePath(path.join(os.tmpdir(), "opencode-test-" + Math.random().toString(36).slice(2)))
|
||||
yield* Effect.promise(() => fs.mkdir(dirpath, { recursive: true }))
|
||||
const dir = sanitizePath(yield* Effect.promise(() => fs.realpath(dirpath)))
|
||||
|
||||
yield* Effect.addFinalizer(() =>
|
||||
Effect.promise(async () => {
|
||||
if (options?.git) await stop(dir).catch(() => undefined)
|
||||
await clean(dir).catch(() => undefined)
|
||||
}),
|
||||
)
|
||||
|
||||
const git = (...args: string[]) =>
|
||||
spawner.spawn(ChildProcess.make("git", args, { cwd: dir })).pipe(Effect.flatMap((handle) => handle.exitCode))
|
||||
@@ -94,9 +106,11 @@ export function tmpdirScoped(options?: { git?: boolean; config?: Partial<Config.
|
||||
}
|
||||
|
||||
if (options?.config) {
|
||||
yield* fs.writeFileString(
|
||||
path.join(dir, "opencode.json"),
|
||||
JSON.stringify({ $schema: "https://opencode.ai/config.json", ...options.config }),
|
||||
yield* Effect.promise(() =>
|
||||
fs.writeFile(
|
||||
path.join(dir, "opencode.json"),
|
||||
JSON.stringify({ $schema: "https://opencode.ai/config.json", ...options.config }),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -111,7 +125,7 @@ export const provideInstance =
|
||||
Effect.promise<A>(async () =>
|
||||
Instance.provide({
|
||||
directory,
|
||||
fn: () => Effect.runPromiseWith(services)(self),
|
||||
fn: () => Effect.runPromiseWith(services)(self.pipe(Effect.provideService(InstanceRef, Instance.current))),
|
||||
}),
|
||||
),
|
||||
)
|
||||
@@ -139,3 +153,20 @@ export function provideTmpdirInstance<A, E, R>(
|
||||
return yield* self(path).pipe(provideInstance(path))
|
||||
})
|
||||
}
|
||||
|
||||
export function provideTmpdirServer<A, E, R>(
|
||||
self: (input: { dir: string; llm: TestLLMServer["Service"] }) => Effect.Effect<A, E, R>,
|
||||
options?: { git?: boolean; config?: (url: string) => Partial<Config.Info> },
|
||||
): Effect.Effect<
|
||||
A,
|
||||
E | PlatformError.PlatformError,
|
||||
R | TestLLMServer | ChildProcessSpawner.ChildProcessSpawner | Scope.Scope
|
||||
> {
|
||||
return Effect.gen(function* () {
|
||||
const llm = yield* TestLLMServer
|
||||
return yield* provideTmpdirInstance((dir) => self({ dir, llm }), {
|
||||
git: options?.git,
|
||||
config: options?.config?.(llm.url),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import * as Formatter from "../../src/format/formatter"
|
||||
const it = testEffect(Layer.mergeAll(Format.defaultLayer, CrossSpawnSpawner.defaultLayer, NodeFileSystem.layer))
|
||||
|
||||
describe("Format", () => {
|
||||
it.effect("status() returns built-in formatters when no config overrides", () =>
|
||||
it.live("status() returns built-in formatters when no config overrides", () =>
|
||||
provideTmpdirInstance(() =>
|
||||
Format.Service.use((fmt) =>
|
||||
Effect.gen(function* () {
|
||||
@@ -32,7 +32,7 @@ describe("Format", () => {
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("status() returns empty list when formatter is disabled", () =>
|
||||
it.live("status() returns empty list when formatter is disabled", () =>
|
||||
provideTmpdirInstance(
|
||||
() =>
|
||||
Format.Service.use((fmt) =>
|
||||
@@ -44,7 +44,7 @@ describe("Format", () => {
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("status() excludes formatters marked as disabled in config", () =>
|
||||
it.live("status() excludes formatters marked as disabled in config", () =>
|
||||
provideTmpdirInstance(
|
||||
() =>
|
||||
Format.Service.use((fmt) =>
|
||||
@@ -64,11 +64,11 @@ describe("Format", () => {
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("service initializes without error", () =>
|
||||
it.live("service initializes without error", () =>
|
||||
provideTmpdirInstance(() => Format.Service.use(() => Effect.void)),
|
||||
)
|
||||
|
||||
it.effect("status() initializes formatter state per directory", () =>
|
||||
it.live("status() initializes formatter state per directory", () =>
|
||||
Effect.gen(function* () {
|
||||
const a = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status()), {
|
||||
config: { formatter: false },
|
||||
@@ -80,7 +80,7 @@ describe("Format", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("runs enabled checks for matching formatters in parallel", () =>
|
||||
it.live("runs enabled checks for matching formatters in parallel", () =>
|
||||
provideTmpdirInstance((path) =>
|
||||
Effect.gen(function* () {
|
||||
const file = `${path}/test.parallel`
|
||||
@@ -144,7 +144,7 @@ describe("Format", () => {
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("runs matching formatters sequentially for the same file", () =>
|
||||
it.live("runs matching formatters sequentially for the same file", () =>
|
||||
provideTmpdirInstance(
|
||||
(path) =>
|
||||
Effect.gen(function* () {
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
import { test, type TestOptions } from "bun:test"
|
||||
import { Cause, Effect, Exit, Layer } from "effect"
|
||||
import type * as Scope from "effect/Scope"
|
||||
import * as TestClock from "effect/testing/TestClock"
|
||||
import * as TestConsole from "effect/testing/TestConsole"
|
||||
|
||||
type Body<A, E, R> = Effect.Effect<A, E, R> | (() => Effect.Effect<A, E, R>)
|
||||
const env = TestConsole.layer
|
||||
|
||||
const body = <A, E, R>(value: Body<A, E, R>) => Effect.suspend(() => (typeof value === "function" ? value() : value))
|
||||
|
||||
const run = <A, E, R, E2>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer<R, E2, never>) =>
|
||||
const run = <A, E, R, E2>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer<R, E2>) =>
|
||||
Effect.gen(function* () {
|
||||
const exit = yield* body(value).pipe(Effect.scoped, Effect.provide(layer), Effect.exit)
|
||||
if (Exit.isFailure(exit)) {
|
||||
@@ -19,19 +19,35 @@ const run = <A, E, R, E2>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer
|
||||
return yield* exit
|
||||
}).pipe(Effect.runPromise)
|
||||
|
||||
const make = <R, E>(layer: Layer.Layer<R, E, never>) => {
|
||||
const make = <R, E>(testLayer: Layer.Layer<R, E>, liveLayer: Layer.Layer<R, E>) => {
|
||||
const effect = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
|
||||
test(name, () => run(value, layer), opts)
|
||||
test(name, () => run(value, testLayer), opts)
|
||||
|
||||
effect.only = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
|
||||
test.only(name, () => run(value, layer), opts)
|
||||
test.only(name, () => run(value, testLayer), opts)
|
||||
|
||||
effect.skip = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
|
||||
test.skip(name, () => run(value, layer), opts)
|
||||
test.skip(name, () => run(value, testLayer), opts)
|
||||
|
||||
return { effect }
|
||||
const live = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
|
||||
test(name, () => run(value, liveLayer), opts)
|
||||
|
||||
live.only = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
|
||||
test.only(name, () => run(value, liveLayer), opts)
|
||||
|
||||
live.skip = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
|
||||
test.skip(name, () => run(value, liveLayer), opts)
|
||||
|
||||
return { effect, live }
|
||||
}
|
||||
|
||||
export const it = make(env)
|
||||
// Test environment with TestClock and TestConsole
|
||||
const testEnv = Layer.mergeAll(TestConsole.layer, TestClock.layer())
|
||||
|
||||
export const testEffect = <R, E>(layer: Layer.Layer<R, E, never>) => make(Layer.provideMerge(layer, env))
|
||||
// Live environment - uses real clock, but keeps TestConsole for output capture
|
||||
const liveEnv = TestConsole.layer
|
||||
|
||||
export const it = make(testEnv, liveEnv)
|
||||
|
||||
export const testEffect = <R, E>(layer: Layer.Layer<R, E>) =>
|
||||
make(Layer.provideMerge(layer, testEnv), Layer.provideMerge(layer, liveEnv))
|
||||
|
||||
422
packages/opencode/test/lib/llm-server.ts
Normal file
422
packages/opencode/test/lib/llm-server.ts
Normal file
@@ -0,0 +1,422 @@
|
||||
import { NodeHttpServer, NodeHttpServerRequest } from "@effect/platform-node"
|
||||
import * as Http from "node:http"
|
||||
import { Deferred, Effect, Layer, ServiceMap, Stream } from "effect"
|
||||
import * as HttpServer from "effect/unstable/http/HttpServer"
|
||||
import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
|
||||
|
||||
export type Usage = { input: number; output: number }
|
||||
|
||||
type Line = Record<string, unknown>
|
||||
|
||||
type Hit = {
|
||||
url: URL
|
||||
body: Record<string, unknown>
|
||||
}
|
||||
|
||||
type Wait = {
|
||||
count: number
|
||||
ready: Deferred.Deferred<void>
|
||||
}
|
||||
|
||||
type Sse = {
|
||||
type: "sse"
|
||||
head: unknown[]
|
||||
tail: unknown[]
|
||||
wait?: PromiseLike<unknown>
|
||||
hang?: boolean
|
||||
error?: unknown
|
||||
reset?: boolean
|
||||
}
|
||||
|
||||
type HttpError = {
|
||||
type: "http-error"
|
||||
status: number
|
||||
body: unknown
|
||||
}
|
||||
|
||||
export type Item = Sse | HttpError
|
||||
|
||||
const done = Symbol("done")
|
||||
|
||||
function line(input: unknown) {
|
||||
if (input === done) return "data: [DONE]\n\n"
|
||||
return `data: ${JSON.stringify(input)}\n\n`
|
||||
}
|
||||
|
||||
function tokens(input?: Usage) {
|
||||
if (!input) return
|
||||
return {
|
||||
prompt_tokens: input.input,
|
||||
completion_tokens: input.output,
|
||||
total_tokens: input.input + input.output,
|
||||
}
|
||||
}
|
||||
|
||||
function chunk(input: { delta?: Record<string, unknown>; finish?: string; usage?: Usage }) {
|
||||
return {
|
||||
id: "chatcmpl-test",
|
||||
object: "chat.completion.chunk",
|
||||
choices: [
|
||||
{
|
||||
delta: input.delta ?? {},
|
||||
...(input.finish ? { finish_reason: input.finish } : {}),
|
||||
},
|
||||
],
|
||||
...(input.usage ? { usage: tokens(input.usage) } : {}),
|
||||
} satisfies Line
|
||||
}
|
||||
|
||||
function role() {
|
||||
return chunk({ delta: { role: "assistant" } })
|
||||
}
|
||||
|
||||
function textLine(value: string) {
|
||||
return chunk({ delta: { content: value } })
|
||||
}
|
||||
|
||||
function reasonLine(value: string) {
|
||||
return chunk({ delta: { reasoning_content: value } })
|
||||
}
|
||||
|
||||
function finishLine(reason: string, usage?: Usage) {
|
||||
return chunk({ finish: reason, usage })
|
||||
}
|
||||
|
||||
function toolStartLine(id: string, name: string) {
|
||||
return chunk({
|
||||
delta: {
|
||||
tool_calls: [
|
||||
{
|
||||
index: 0,
|
||||
id,
|
||||
type: "function",
|
||||
function: {
|
||||
name,
|
||||
arguments: "",
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
function toolArgsLine(value: string) {
|
||||
return chunk({
|
||||
delta: {
|
||||
tool_calls: [
|
||||
{
|
||||
index: 0,
|
||||
function: {
|
||||
arguments: value,
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
function bytes(input: Iterable<unknown>) {
|
||||
return Stream.fromIterable([...input].map(line)).pipe(Stream.encodeText)
|
||||
}
|
||||
|
||||
function send(item: Sse) {
|
||||
const head = bytes(item.head)
|
||||
const tail = bytes([...item.tail, ...(item.hang || item.error ? [] : [done])])
|
||||
const empty = Stream.fromIterable<Uint8Array>([])
|
||||
const wait = item.wait
|
||||
const body: Stream.Stream<Uint8Array, unknown> = wait
|
||||
? Stream.concat(head, Stream.fromEffect(Effect.promise(() => wait)).pipe(Stream.flatMap(() => tail)))
|
||||
: Stream.concat(head, tail)
|
||||
let end: Stream.Stream<Uint8Array, unknown> = empty
|
||||
if (item.error) end = Stream.concat(empty, Stream.fail(item.error))
|
||||
else if (item.hang) end = Stream.concat(empty, Stream.never)
|
||||
|
||||
return HttpServerResponse.stream(Stream.concat(body, end), { contentType: "text/event-stream" })
|
||||
}
|
||||
|
||||
const reset = Effect.fn("TestLLMServer.reset")(function* (item: Sse) {
|
||||
const req = yield* HttpServerRequest.HttpServerRequest
|
||||
const res = NodeHttpServerRequest.toServerResponse(req)
|
||||
yield* Effect.sync(() => {
|
||||
res.writeHead(200, { "content-type": "text/event-stream" })
|
||||
for (const part of item.head) res.write(line(part))
|
||||
for (const part of item.tail) res.write(line(part))
|
||||
res.destroy(new Error("connection reset"))
|
||||
})
|
||||
yield* Effect.never
|
||||
})
|
||||
|
||||
function fail(item: HttpError) {
|
||||
return HttpServerResponse.text(JSON.stringify(item.body), {
|
||||
status: item.status,
|
||||
contentType: "application/json",
|
||||
})
|
||||
}
|
||||
|
||||
export class Reply {
|
||||
#head: unknown[] = [role()]
|
||||
#tail: unknown[] = []
|
||||
#usage: Usage | undefined
|
||||
#finish: string | undefined
|
||||
#wait: PromiseLike<unknown> | undefined
|
||||
#hang = false
|
||||
#error: unknown
|
||||
#reset = false
|
||||
#seq = 0
|
||||
|
||||
#id() {
|
||||
this.#seq += 1
|
||||
return `call_${this.#seq}`
|
||||
}
|
||||
|
||||
text(value: string) {
|
||||
this.#tail = [...this.#tail, textLine(value)]
|
||||
return this
|
||||
}
|
||||
|
||||
reason(value: string) {
|
||||
this.#tail = [...this.#tail, reasonLine(value)]
|
||||
return this
|
||||
}
|
||||
|
||||
usage(value: Usage) {
|
||||
this.#usage = value
|
||||
return this
|
||||
}
|
||||
|
||||
wait(value: PromiseLike<unknown>) {
|
||||
this.#wait = value
|
||||
return this
|
||||
}
|
||||
|
||||
stop() {
|
||||
this.#finish = "stop"
|
||||
this.#hang = false
|
||||
this.#error = undefined
|
||||
this.#reset = false
|
||||
return this
|
||||
}
|
||||
|
||||
toolCalls() {
|
||||
this.#finish = "tool_calls"
|
||||
this.#hang = false
|
||||
this.#error = undefined
|
||||
this.#reset = false
|
||||
return this
|
||||
}
|
||||
|
||||
tool(name: string, input: unknown) {
|
||||
const id = this.#id()
|
||||
const args = JSON.stringify(input)
|
||||
this.#tail = [...this.#tail, toolStartLine(id, name), toolArgsLine(args)]
|
||||
return this.toolCalls()
|
||||
}
|
||||
|
||||
pendingTool(name: string, input: unknown) {
|
||||
const id = this.#id()
|
||||
const args = JSON.stringify(input)
|
||||
const size = Math.max(1, Math.floor(args.length / 2))
|
||||
this.#tail = [...this.#tail, toolStartLine(id, name), toolArgsLine(args.slice(0, size))]
|
||||
return this
|
||||
}
|
||||
|
||||
hang() {
|
||||
this.#finish = undefined
|
||||
this.#hang = true
|
||||
this.#error = undefined
|
||||
this.#reset = false
|
||||
return this
|
||||
}
|
||||
|
||||
streamError(error: unknown = "boom") {
|
||||
this.#finish = undefined
|
||||
this.#hang = false
|
||||
this.#error = error
|
||||
this.#reset = false
|
||||
return this
|
||||
}
|
||||
|
||||
reset() {
|
||||
this.#finish = undefined
|
||||
this.#hang = false
|
||||
this.#error = undefined
|
||||
this.#reset = true
|
||||
return this
|
||||
}
|
||||
|
||||
item(): Item {
|
||||
return {
|
||||
type: "sse",
|
||||
head: this.#head,
|
||||
tail: this.#finish ? [...this.#tail, finishLine(this.#finish, this.#usage)] : this.#tail,
|
||||
wait: this.#wait,
|
||||
hang: this.#hang,
|
||||
error: this.#error,
|
||||
reset: this.#reset,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function reply() {
|
||||
return new Reply()
|
||||
}
|
||||
|
||||
export function httpError(status: number, body: unknown): Item {
|
||||
return {
|
||||
type: "http-error",
|
||||
status,
|
||||
body,
|
||||
}
|
||||
}
|
||||
|
||||
export function raw(input: {
|
||||
chunks?: unknown[]
|
||||
head?: unknown[]
|
||||
tail?: unknown[]
|
||||
wait?: PromiseLike<unknown>
|
||||
hang?: boolean
|
||||
error?: unknown
|
||||
reset?: boolean
|
||||
}): Item {
|
||||
return {
|
||||
type: "sse",
|
||||
head: input.head ?? input.chunks ?? [],
|
||||
tail: input.tail ?? [],
|
||||
wait: input.wait,
|
||||
hang: input.hang,
|
||||
error: input.error,
|
||||
reset: input.reset,
|
||||
}
|
||||
}
|
||||
|
||||
function item(input: Item | Reply) {
|
||||
return input instanceof Reply ? input.item() : input
|
||||
}
|
||||
|
||||
namespace TestLLMServer {
|
||||
export interface Service {
|
||||
readonly url: string
|
||||
readonly push: (...input: (Item | Reply)[]) => Effect.Effect<void>
|
||||
readonly text: (value: string, opts?: { usage?: Usage }) => Effect.Effect<void>
|
||||
readonly tool: (name: string, input: unknown) => Effect.Effect<void>
|
||||
readonly toolHang: (name: string, input: unknown) => Effect.Effect<void>
|
||||
readonly reason: (value: string, opts?: { text?: string; usage?: Usage }) => Effect.Effect<void>
|
||||
readonly fail: (message?: unknown) => Effect.Effect<void>
|
||||
readonly error: (status: number, body: unknown) => Effect.Effect<void>
|
||||
readonly hang: Effect.Effect<void>
|
||||
readonly hold: (value: string, wait: PromiseLike<unknown>) => Effect.Effect<void>
|
||||
readonly hits: Effect.Effect<Hit[]>
|
||||
readonly calls: Effect.Effect<number>
|
||||
readonly wait: (count: number) => Effect.Effect<void>
|
||||
readonly inputs: Effect.Effect<Record<string, unknown>[]>
|
||||
readonly pending: Effect.Effect<number>
|
||||
}
|
||||
}
|
||||
|
||||
export class TestLLMServer extends ServiceMap.Service<TestLLMServer, TestLLMServer.Service>()("@test/LLMServer") {
|
||||
static readonly layer = Layer.effect(
|
||||
TestLLMServer,
|
||||
Effect.gen(function* () {
|
||||
const server = yield* HttpServer.HttpServer
|
||||
const router = yield* HttpRouter.HttpRouter
|
||||
|
||||
let hits: Hit[] = []
|
||||
let list: Item[] = []
|
||||
let waits: Wait[] = []
|
||||
|
||||
const queue = (...input: (Item | Reply)[]) => {
|
||||
list = [...list, ...input.map(item)]
|
||||
}
|
||||
|
||||
const notify = Effect.fnUntraced(function* () {
|
||||
const ready = waits.filter((item) => hits.length >= item.count)
|
||||
if (!ready.length) return
|
||||
waits = waits.filter((item) => hits.length < item.count)
|
||||
yield* Effect.forEach(ready, (item) => Deferred.succeed(item.ready, void 0))
|
||||
})
|
||||
|
||||
const pull = () => {
|
||||
const first = list[0]
|
||||
if (!first) return
|
||||
list = list.slice(1)
|
||||
return first
|
||||
}
|
||||
|
||||
yield* router.add(
|
||||
"POST",
|
||||
"/v1/chat/completions",
|
||||
Effect.gen(function* () {
|
||||
const req = yield* HttpServerRequest.HttpServerRequest
|
||||
const next = pull()
|
||||
if (!next) return HttpServerResponse.text("unexpected request", { status: 500 })
|
||||
const body = yield* req.json.pipe(Effect.orElseSucceed(() => ({})))
|
||||
hits = [
|
||||
...hits,
|
||||
{
|
||||
url: new URL(req.originalUrl, "http://localhost"),
|
||||
body: body && typeof body === "object" ? (body as Record<string, unknown>) : {},
|
||||
},
|
||||
]
|
||||
yield* notify()
|
||||
if (next.type === "sse" && next.reset) {
|
||||
yield* reset(next)
|
||||
return HttpServerResponse.empty()
|
||||
}
|
||||
if (next.type === "sse") return send(next)
|
||||
return fail(next)
|
||||
}),
|
||||
)
|
||||
|
||||
yield* server.serve(router.asHttpEffect())
|
||||
|
||||
return TestLLMServer.of({
|
||||
url:
|
||||
server.address._tag === "TcpAddress"
|
||||
? `http://127.0.0.1:${server.address.port}/v1`
|
||||
: `unix://${server.address.path}/v1`,
|
||||
push: Effect.fn("TestLLMServer.push")(function* (...input: (Item | Reply)[]) {
|
||||
queue(...input)
|
||||
}),
|
||||
text: Effect.fn("TestLLMServer.text")(function* (value: string, opts?: { usage?: Usage }) {
|
||||
const out = reply().text(value)
|
||||
if (opts?.usage) out.usage(opts.usage)
|
||||
queue(out.stop().item())
|
||||
}),
|
||||
tool: Effect.fn("TestLLMServer.tool")(function* (name: string, input: unknown) {
|
||||
queue(reply().tool(name, input).item())
|
||||
}),
|
||||
toolHang: Effect.fn("TestLLMServer.toolHang")(function* (name: string, input: unknown) {
|
||||
queue(reply().pendingTool(name, input).hang().item())
|
||||
}),
|
||||
reason: Effect.fn("TestLLMServer.reason")(function* (value: string, opts?: { text?: string; usage?: Usage }) {
|
||||
const out = reply().reason(value)
|
||||
if (opts?.text) out.text(opts.text)
|
||||
if (opts?.usage) out.usage(opts.usage)
|
||||
queue(out.stop().item())
|
||||
}),
|
||||
fail: Effect.fn("TestLLMServer.fail")(function* (message: unknown = "boom") {
|
||||
queue(reply().streamError(message).item())
|
||||
}),
|
||||
error: Effect.fn("TestLLMServer.error")(function* (status: number, body: unknown) {
|
||||
queue(httpError(status, body))
|
||||
}),
|
||||
hang: Effect.gen(function* () {
|
||||
queue(reply().hang().item())
|
||||
}).pipe(Effect.withSpan("TestLLMServer.hang")),
|
||||
hold: Effect.fn("TestLLMServer.hold")(function* (value: string, wait: PromiseLike<unknown>) {
|
||||
queue(reply().wait(wait).text(value).stop().item())
|
||||
}),
|
||||
hits: Effect.sync(() => [...hits]),
|
||||
calls: Effect.sync(() => hits.length),
|
||||
wait: Effect.fn("TestLLMServer.wait")(function* (count: number) {
|
||||
if (hits.length >= count) return
|
||||
const ready = yield* Deferred.make<void>()
|
||||
waits = [...waits, { count, ready }]
|
||||
yield* Deferred.await(ready)
|
||||
}),
|
||||
inputs: Effect.sync(() => hits.map((hit) => hit.body)),
|
||||
pending: Effect.sync(() => list.length),
|
||||
})
|
||||
}),
|
||||
).pipe(Layer.provide(HttpRouter.layer), Layer.provide(NodeHttpServer.layer(() => Http.createServer(), { port: 0 })))
|
||||
}
|
||||
@@ -1,8 +1,6 @@
|
||||
import { NodeFileSystem } from "@effect/platform-node"
|
||||
import { expect } from "bun:test"
|
||||
import { APICallError } from "ai"
|
||||
import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect"
|
||||
import * as Stream from "effect/Stream"
|
||||
import { Cause, Effect, Exit, Fiber, Layer } from "effect"
|
||||
import path from "path"
|
||||
import type { Agent } from "../../src/agent/agent"
|
||||
import { Agent as AgentSvc } from "../../src/agent/agent"
|
||||
@@ -10,7 +8,7 @@ import { Bus } from "../../src/bus"
|
||||
import { Config } from "../../src/config/config"
|
||||
import { Permission } from "../../src/permission"
|
||||
import { Plugin } from "../../src/plugin"
|
||||
import type { Provider } from "../../src/provider/provider"
|
||||
import { Provider } from "../../src/provider/provider"
|
||||
import { ModelID, ProviderID } from "../../src/provider/schema"
|
||||
import { Session } from "../../src/session"
|
||||
import { LLM } from "../../src/session/llm"
|
||||
@@ -21,8 +19,9 @@ import { SessionStatus } from "../../src/session/status"
|
||||
import { Snapshot } from "../../src/snapshot"
|
||||
import { Log } from "../../src/util/log"
|
||||
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
|
||||
import { provideTmpdirInstance } from "../fixture/fixture"
|
||||
import { provideTmpdirServer } from "../fixture/fixture"
|
||||
import { testEffect } from "../lib/effect"
|
||||
import { reply, TestLLMServer } from "../lib/llm-server"
|
||||
|
||||
Log.init({ print: false })
|
||||
|
||||
@@ -31,118 +30,51 @@ const ref = {
|
||||
modelID: ModelID.make("test-model"),
|
||||
}
|
||||
|
||||
type Script = Stream.Stream<LLM.Event, unknown> | ((input: LLM.StreamInput) => Stream.Stream<LLM.Event, unknown>)
|
||||
|
||||
class TestLLM extends ServiceMap.Service<
|
||||
TestLLM,
|
||||
{
|
||||
readonly push: (stream: Script) => Effect.Effect<void>
|
||||
readonly reply: (...items: LLM.Event[]) => Effect.Effect<void>
|
||||
readonly calls: Effect.Effect<number>
|
||||
readonly inputs: Effect.Effect<LLM.StreamInput[]>
|
||||
}
|
||||
>()("@test/SessionProcessorLLM") {}
|
||||
|
||||
function stream(...items: LLM.Event[]) {
|
||||
return Stream.make(...items)
|
||||
const cfg = {
|
||||
provider: {
|
||||
test: {
|
||||
name: "Test",
|
||||
id: "test",
|
||||
env: [],
|
||||
npm: "@ai-sdk/openai-compatible",
|
||||
models: {
|
||||
"test-model": {
|
||||
id: "test-model",
|
||||
name: "Test Model",
|
||||
attachment: false,
|
||||
reasoning: false,
|
||||
temperature: false,
|
||||
tool_call: true,
|
||||
release_date: "2025-01-01",
|
||||
limit: { context: 100000, output: 10000 },
|
||||
cost: { input: 0, output: 0 },
|
||||
options: {},
|
||||
},
|
||||
},
|
||||
options: {
|
||||
apiKey: "test-key",
|
||||
baseURL: "http://localhost:1/v1",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
function usage(input = 1, output = 1, total = input + output) {
|
||||
function providerCfg(url: string) {
|
||||
return {
|
||||
inputTokens: input,
|
||||
outputTokens: output,
|
||||
totalTokens: total,
|
||||
inputTokenDetails: {
|
||||
noCacheTokens: undefined,
|
||||
cacheReadTokens: undefined,
|
||||
cacheWriteTokens: undefined,
|
||||
},
|
||||
outputTokenDetails: {
|
||||
textTokens: undefined,
|
||||
reasoningTokens: undefined,
|
||||
...cfg,
|
||||
provider: {
|
||||
...cfg.provider,
|
||||
test: {
|
||||
...cfg.provider.test,
|
||||
options: {
|
||||
...cfg.provider.test.options,
|
||||
baseURL: url,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
function start(): LLM.Event {
|
||||
return { type: "start" }
|
||||
}
|
||||
|
||||
function textStart(id = "t"): LLM.Event {
|
||||
return { type: "text-start", id }
|
||||
}
|
||||
|
||||
function textDelta(id: string, text: string): LLM.Event {
|
||||
return { type: "text-delta", id, text }
|
||||
}
|
||||
|
||||
function textEnd(id = "t"): LLM.Event {
|
||||
return { type: "text-end", id }
|
||||
}
|
||||
|
||||
function reasoningStart(id: string): LLM.Event {
|
||||
return { type: "reasoning-start", id }
|
||||
}
|
||||
|
||||
function reasoningDelta(id: string, text: string): LLM.Event {
|
||||
return { type: "reasoning-delta", id, text }
|
||||
}
|
||||
|
||||
function reasoningEnd(id: string): LLM.Event {
|
||||
return { type: "reasoning-end", id }
|
||||
}
|
||||
|
||||
function finishStep(): LLM.Event {
|
||||
return {
|
||||
type: "finish-step",
|
||||
finishReason: "stop",
|
||||
rawFinishReason: "stop",
|
||||
response: { id: "res", modelId: "test-model", timestamp: new Date() },
|
||||
providerMetadata: undefined,
|
||||
usage: usage(),
|
||||
}
|
||||
}
|
||||
|
||||
function finish(): LLM.Event {
|
||||
return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() }
|
||||
}
|
||||
|
||||
function toolInputStart(id: string, toolName: string): LLM.Event {
|
||||
return { type: "tool-input-start", id, toolName }
|
||||
}
|
||||
|
||||
function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event {
|
||||
return { type: "tool-call", toolCallId, toolName, input }
|
||||
}
|
||||
|
||||
function fail<E>(err: E, ...items: LLM.Event[]) {
|
||||
return stream(...items).pipe(Stream.concat(Stream.fail(err)))
|
||||
}
|
||||
|
||||
function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) {
|
||||
return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never)))
|
||||
}
|
||||
|
||||
function model(context: number): Provider.Model {
|
||||
return {
|
||||
id: "test-model",
|
||||
providerID: "test",
|
||||
name: "Test",
|
||||
limit: { context, output: 10 },
|
||||
cost: { input: 0, output: 0, cache: { read: 0, write: 0 } },
|
||||
capabilities: {
|
||||
toolcall: true,
|
||||
attachment: false,
|
||||
reasoning: false,
|
||||
temperature: true,
|
||||
input: { text: true, image: false, audio: false, video: false },
|
||||
output: { text: true, image: false, audio: false, video: false },
|
||||
},
|
||||
api: { npm: "@ai-sdk/anthropic" },
|
||||
options: {},
|
||||
} as Provider.Model
|
||||
}
|
||||
|
||||
function agent(): Agent.Info {
|
||||
return {
|
||||
name: "build",
|
||||
@@ -211,43 +143,6 @@ const assistant = Effect.fn("TestSession.assistant")(function* (
|
||||
return msg
|
||||
})
|
||||
|
||||
const llm = Layer.unwrap(
|
||||
Effect.gen(function* () {
|
||||
const queue: Script[] = []
|
||||
const inputs: LLM.StreamInput[] = []
|
||||
let calls = 0
|
||||
|
||||
const push = Effect.fn("TestLLM.push")((item: Script) => {
|
||||
queue.push(item)
|
||||
return Effect.void
|
||||
})
|
||||
|
||||
const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(stream(...items)))
|
||||
return Layer.mergeAll(
|
||||
Layer.succeed(
|
||||
LLM.Service,
|
||||
LLM.Service.of({
|
||||
stream: (input) => {
|
||||
calls += 1
|
||||
inputs.push(input)
|
||||
const item = queue.shift() ?? Stream.empty
|
||||
return typeof item === "function" ? item(input) : item
|
||||
},
|
||||
}),
|
||||
),
|
||||
Layer.succeed(
|
||||
TestLLM,
|
||||
TestLLM.of({
|
||||
push,
|
||||
reply,
|
||||
calls: Effect.sync(() => calls),
|
||||
inputs: Effect.sync(() => [...inputs]),
|
||||
}),
|
||||
),
|
||||
)
|
||||
}),
|
||||
)
|
||||
|
||||
const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
|
||||
const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
|
||||
const deps = Layer.mergeAll(
|
||||
@@ -257,27 +152,37 @@ const deps = Layer.mergeAll(
|
||||
Permission.layer,
|
||||
Plugin.defaultLayer,
|
||||
Config.defaultLayer,
|
||||
LLM.defaultLayer,
|
||||
Provider.defaultLayer,
|
||||
status,
|
||||
llm,
|
||||
).pipe(Layer.provideMerge(infra))
|
||||
const env = SessionProcessor.layer.pipe(Layer.provideMerge(deps))
|
||||
const env = Layer.mergeAll(TestLLMServer.layer, SessionProcessor.layer.pipe(Layer.provideMerge(deps)))
|
||||
|
||||
const it = testEffect(env)
|
||||
|
||||
it.effect("session.processor effect tests capture llm input cleanly", () => {
|
||||
return provideTmpdirInstance(
|
||||
(dir) =>
|
||||
Effect.gen(function* () {
|
||||
const test = yield* TestLLM
|
||||
const processors = yield* SessionProcessor.Service
|
||||
const session = yield* Session.Service
|
||||
const boot = Effect.fn("test.boot")(function* () {
|
||||
const processors = yield* SessionProcessor.Service
|
||||
const session = yield* Session.Service
|
||||
const provider = yield* Provider.Service
|
||||
return { processors, session, provider }
|
||||
})
|
||||
|
||||
yield* test.reply(start(), textStart(), textDelta("t", "hello"), textEnd(), finishStep(), finish())
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
it.live("session.processor effect tests capture llm input cleanly", () =>
|
||||
provideTmpdirServer(
|
||||
({ dir, llm }) =>
|
||||
Effect.gen(function* () {
|
||||
const { processors, session, provider } = yield* boot()
|
||||
|
||||
yield* llm.text("hello")
|
||||
|
||||
const chat = yield* session.create({})
|
||||
const parent = yield* user(chat.id, "hi")
|
||||
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
|
||||
const mdl = model(100)
|
||||
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
|
||||
const handle = yield* processors.create({
|
||||
assistantMessage: msg,
|
||||
sessionID: chat.id,
|
||||
@@ -303,46 +208,29 @@ it.effect("session.processor effect tests capture llm input cleanly", () => {
|
||||
|
||||
const value = yield* handle.process(input)
|
||||
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
|
||||
const calls = yield* test.calls
|
||||
const inputs = yield* test.inputs
|
||||
const calls = yield* llm.calls
|
||||
|
||||
expect(value).toBe("continue")
|
||||
expect(calls).toBe(1)
|
||||
expect(inputs).toHaveLength(1)
|
||||
expect(inputs[0].messages).toStrictEqual([{ role: "user", content: "hi" }])
|
||||
expect(parts.some((part) => part.type === "text" && part.text === "hello")).toBe(true)
|
||||
}),
|
||||
{ git: true },
|
||||
)
|
||||
})
|
||||
{ git: true, config: (url) => providerCfg(url) },
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("session.processor effect tests stop after token overflow requests compaction", () => {
|
||||
return provideTmpdirInstance(
|
||||
(dir) =>
|
||||
it.live("session.processor effect tests stop after token overflow requests compaction", () =>
|
||||
provideTmpdirServer(
|
||||
({ dir, llm }) =>
|
||||
Effect.gen(function* () {
|
||||
const test = yield* TestLLM
|
||||
const processors = yield* SessionProcessor.Service
|
||||
const session = yield* Session.Service
|
||||
const { processors, session, provider } = yield* boot()
|
||||
|
||||
yield* test.reply(
|
||||
start(),
|
||||
{
|
||||
type: "finish-step",
|
||||
finishReason: "stop",
|
||||
rawFinishReason: "stop",
|
||||
response: { id: "res", modelId: "test-model", timestamp: new Date() },
|
||||
providerMetadata: undefined,
|
||||
usage: usage(100, 0, 100),
|
||||
},
|
||||
textStart(),
|
||||
textDelta("t", "after"),
|
||||
textEnd(),
|
||||
)
|
||||
yield* llm.text("after", { usage: { input: 100, output: 0 } })
|
||||
|
||||
const chat = yield* session.create({})
|
||||
const parent = yield* user(chat.id, "compact")
|
||||
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
|
||||
const mdl = model(20)
|
||||
const base = yield* provider.getModel(ref.providerID, ref.modelID)
|
||||
const mdl = { ...base, limit: { context: 20, output: 10 } }
|
||||
const handle = yield* processors.create({
|
||||
assistantMessage: msg,
|
||||
sessionID: chat.id,
|
||||
@@ -369,51 +257,73 @@ it.effect("session.processor effect tests stop after token overflow requests com
|
||||
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
|
||||
|
||||
expect(value).toBe("compact")
|
||||
expect(parts.some((part) => part.type === "text")).toBe(false)
|
||||
expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
|
||||
expect(parts.some((part) => part.type === "step-finish")).toBe(true)
|
||||
}),
|
||||
{ git: true },
|
||||
)
|
||||
})
|
||||
{ git: true, config: (url) => providerCfg(url) },
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("session.processor effect tests reset reasoning state across retries", () => {
|
||||
return provideTmpdirInstance(
|
||||
(dir) =>
|
||||
it.live("session.processor effect tests capture reasoning from http mock", () =>
|
||||
provideTmpdirServer(
|
||||
({ dir, llm }) =>
|
||||
Effect.gen(function* () {
|
||||
const test = yield* TestLLM
|
||||
const processors = yield* SessionProcessor.Service
|
||||
const session = yield* Session.Service
|
||||
const { processors, session, provider } = yield* boot()
|
||||
|
||||
yield* test.push(
|
||||
fail(
|
||||
new APICallError({
|
||||
message: "boom",
|
||||
url: "https://example.com/v1/chat/completions",
|
||||
requestBodyValues: {},
|
||||
statusCode: 503,
|
||||
responseHeaders: { "retry-after-ms": "0" },
|
||||
responseBody: '{"error":"boom"}',
|
||||
isRetryable: true,
|
||||
}),
|
||||
start(),
|
||||
reasoningStart("r"),
|
||||
reasoningDelta("r", "one"),
|
||||
),
|
||||
)
|
||||
|
||||
yield* test.reply(
|
||||
start(),
|
||||
reasoningStart("r"),
|
||||
reasoningDelta("r", "two"),
|
||||
reasoningEnd("r"),
|
||||
finishStep(),
|
||||
finish(),
|
||||
)
|
||||
yield* llm.push(reply().reason("think").text("done").stop())
|
||||
|
||||
const chat = yield* session.create({})
|
||||
const parent = yield* user(chat.id, "reason")
|
||||
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
|
||||
const mdl = model(100)
|
||||
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
|
||||
const handle = yield* processors.create({
|
||||
assistantMessage: msg,
|
||||
sessionID: chat.id,
|
||||
model: mdl,
|
||||
})
|
||||
|
||||
const value = yield* handle.process({
|
||||
user: {
|
||||
id: parent.id,
|
||||
sessionID: chat.id,
|
||||
role: "user",
|
||||
time: parent.time,
|
||||
agent: parent.agent,
|
||||
model: { providerID: ref.providerID, modelID: ref.modelID },
|
||||
} satisfies MessageV2.User,
|
||||
sessionID: chat.id,
|
||||
model: mdl,
|
||||
agent: agent(),
|
||||
system: [],
|
||||
messages: [{ role: "user", content: "reason" }],
|
||||
tools: {},
|
||||
})
|
||||
|
||||
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
|
||||
const reasoning = parts.find((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
|
||||
const text = parts.find((part): part is MessageV2.TextPart => part.type === "text")
|
||||
|
||||
expect(value).toBe("continue")
|
||||
expect(yield* llm.calls).toBe(1)
|
||||
expect(reasoning?.text).toBe("think")
|
||||
expect(text?.text).toBe("done")
|
||||
}),
|
||||
{ git: true, config: (url) => providerCfg(url) },
|
||||
),
|
||||
)
|
||||
|
||||
it.live("session.processor effect tests reset reasoning state across retries", () =>
|
||||
provideTmpdirServer(
|
||||
({ dir, llm }) =>
|
||||
Effect.gen(function* () {
|
||||
const { processors, session, provider } = yield* boot()
|
||||
|
||||
yield* llm.push(reply().reason("one").reset(), reply().reason("two").stop())
|
||||
|
||||
const chat = yield* session.create({})
|
||||
const parent = yield* user(chat.id, "reason")
|
||||
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
|
||||
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
|
||||
const handle = yield* processors.create({
|
||||
assistantMessage: msg,
|
||||
sessionID: chat.id,
|
||||
@@ -441,28 +351,26 @@ it.effect("session.processor effect tests reset reasoning state across retries",
|
||||
const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
|
||||
|
||||
expect(value).toBe("continue")
|
||||
expect(yield* test.calls).toBe(2)
|
||||
expect(yield* llm.calls).toBe(2)
|
||||
expect(reasoning.some((part) => part.text === "two")).toBe(true)
|
||||
expect(reasoning.some((part) => part.text === "onetwo")).toBe(false)
|
||||
}),
|
||||
{ git: true },
|
||||
)
|
||||
})
|
||||
{ git: true, config: (url) => providerCfg(url) },
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("session.processor effect tests do not retry unknown json errors", () => {
|
||||
return provideTmpdirInstance(
|
||||
(dir) =>
|
||||
it.live("session.processor effect tests do not retry unknown json errors", () =>
|
||||
provideTmpdirServer(
|
||||
({ dir, llm }) =>
|
||||
Effect.gen(function* () {
|
||||
const test = yield* TestLLM
|
||||
const processors = yield* SessionProcessor.Service
|
||||
const session = yield* Session.Service
|
||||
const { processors, session, provider } = yield* boot()
|
||||
|
||||
yield* test.push(fail({ error: { message: "no_kv_space" } }, start()))
|
||||
yield* llm.error(400, { error: { message: "no_kv_space" } })
|
||||
|
||||
const chat = yield* session.create({})
|
||||
const parent = yield* user(chat.id, "json")
|
||||
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
|
||||
const mdl = model(100)
|
||||
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
|
||||
const handle = yield* processors.create({
|
||||
assistantMessage: msg,
|
||||
sessionID: chat.id,
|
||||
@@ -487,29 +395,26 @@ it.effect("session.processor effect tests do not retry unknown json errors", ()
|
||||
})
|
||||
|
||||
expect(value).toBe("stop")
|
||||
expect(yield* test.calls).toBe(1)
|
||||
expect(yield* test.inputs).toHaveLength(1)
|
||||
expect(handle.message.error?.name).toBe("UnknownError")
|
||||
expect(yield* llm.calls).toBe(1)
|
||||
expect(handle.message.error?.name).toBe("APIError")
|
||||
}),
|
||||
{ git: true },
|
||||
)
|
||||
})
|
||||
{ git: true, config: (url) => providerCfg(url) },
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("session.processor effect tests retry recognized structured json errors", () => {
|
||||
return provideTmpdirInstance(
|
||||
(dir) =>
|
||||
it.live("session.processor effect tests retry recognized structured json errors", () =>
|
||||
provideTmpdirServer(
|
||||
({ dir, llm }) =>
|
||||
Effect.gen(function* () {
|
||||
const test = yield* TestLLM
|
||||
const processors = yield* SessionProcessor.Service
|
||||
const session = yield* Session.Service
|
||||
const { processors, session, provider } = yield* boot()
|
||||
|
||||
yield* test.push(fail({ type: "error", error: { type: "too_many_requests" } }, start()))
|
||||
yield* test.reply(start(), textStart(), textDelta("t", "after"), textEnd(), finishStep(), finish())
|
||||
yield* llm.error(429, { type: "error", error: { type: "too_many_requests" } })
|
||||
yield* llm.text("after")
|
||||
|
||||
const chat = yield* session.create({})
|
||||
const parent = yield* user(chat.id, "retry json")
|
||||
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
|
||||
const mdl = model(100)
|
||||
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
|
||||
const handle = yield* processors.create({
|
||||
assistantMessage: msg,
|
||||
sessionID: chat.id,
|
||||
@@ -536,43 +441,28 @@ it.effect("session.processor effect tests retry recognized structured json error
|
||||
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
|
||||
|
||||
expect(value).toBe("continue")
|
||||
expect(yield* test.calls).toBe(2)
|
||||
expect(yield* llm.calls).toBe(2)
|
||||
expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
|
||||
expect(handle.message.error).toBeUndefined()
|
||||
}),
|
||||
{ git: true },
|
||||
)
|
||||
})
|
||||
{ git: true, config: (url) => providerCfg(url) },
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("session.processor effect tests publish retry status updates", () => {
|
||||
return provideTmpdirInstance(
|
||||
(dir) =>
|
||||
it.live("session.processor effect tests publish retry status updates", () =>
|
||||
provideTmpdirServer(
|
||||
({ dir, llm }) =>
|
||||
Effect.gen(function* () {
|
||||
const test = yield* TestLLM
|
||||
const processors = yield* SessionProcessor.Service
|
||||
const session = yield* Session.Service
|
||||
const { processors, session, provider } = yield* boot()
|
||||
const bus = yield* Bus.Service
|
||||
|
||||
yield* test.push(
|
||||
fail(
|
||||
new APICallError({
|
||||
message: "boom",
|
||||
url: "https://example.com/v1/chat/completions",
|
||||
requestBodyValues: {},
|
||||
statusCode: 503,
|
||||
responseHeaders: { "retry-after-ms": "0" },
|
||||
responseBody: '{"error":"boom"}',
|
||||
isRetryable: true,
|
||||
}),
|
||||
start(),
|
||||
),
|
||||
)
|
||||
yield* test.reply(start(), finishStep(), finish())
|
||||
yield* llm.error(503, { error: "boom" })
|
||||
yield* llm.text("")
|
||||
|
||||
const chat = yield* session.create({})
|
||||
const parent = yield* user(chat.id, "retry")
|
||||
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
|
||||
const mdl = model(100)
|
||||
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
|
||||
const states: number[] = []
|
||||
const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
|
||||
if (evt.properties.sessionID !== chat.id) return
|
||||
@@ -604,27 +494,25 @@ it.effect("session.processor effect tests publish retry status updates", () => {
|
||||
off()
|
||||
|
||||
expect(value).toBe("continue")
|
||||
expect(yield* test.calls).toBe(2)
|
||||
expect(yield* llm.calls).toBe(2)
|
||||
expect(states).toStrictEqual([1])
|
||||
}),
|
||||
{ git: true },
|
||||
)
|
||||
})
|
||||
{ git: true, config: (url) => providerCfg(url) },
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("session.processor effect tests compact on structured context overflow", () => {
|
||||
return provideTmpdirInstance(
|
||||
(dir) =>
|
||||
it.live("session.processor effect tests compact on structured context overflow", () =>
|
||||
provideTmpdirServer(
|
||||
({ dir, llm }) =>
|
||||
Effect.gen(function* () {
|
||||
const test = yield* TestLLM
|
||||
const processors = yield* SessionProcessor.Service
|
||||
const session = yield* Session.Service
|
||||
const { processors, session, provider } = yield* boot()
|
||||
|
||||
yield* test.push(fail({ type: "error", error: { code: "context_length_exceeded" } }, start()))
|
||||
yield* llm.error(400, { type: "error", error: { code: "context_length_exceeded" } })
|
||||
|
||||
const chat = yield* session.create({})
|
||||
const parent = yield* user(chat.id, "compact json")
|
||||
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
|
||||
const mdl = model(100)
|
||||
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
|
||||
const handle = yield* processors.create({
|
||||
assistantMessage: msg,
|
||||
sessionID: chat.id,
|
||||
@@ -649,32 +537,25 @@ it.effect("session.processor effect tests compact on structured context overflow
|
||||
})
|
||||
|
||||
expect(value).toBe("compact")
|
||||
expect(yield* test.calls).toBe(1)
|
||||
expect(yield* llm.calls).toBe(1)
|
||||
expect(handle.message.error).toBeUndefined()
|
||||
}),
|
||||
{ git: true },
|
||||
)
|
||||
})
|
||||
{ git: true, config: (url) => providerCfg(url) },
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("session.processor effect tests mark pending tools as aborted on cleanup", () => {
|
||||
return provideTmpdirInstance(
|
||||
(dir) =>
|
||||
it.live("session.processor effect tests mark pending tools as aborted on cleanup", () =>
|
||||
provideTmpdirServer(
|
||||
({ dir, llm }) =>
|
||||
Effect.gen(function* () {
|
||||
const ready = defer<void>()
|
||||
const test = yield* TestLLM
|
||||
const processors = yield* SessionProcessor.Service
|
||||
const session = yield* Session.Service
|
||||
const { processors, session, provider } = yield* boot()
|
||||
|
||||
yield* test.push((input) =>
|
||||
hang(input, start(), toolInputStart("tool-1", "bash"), toolCall("tool-1", "bash", { cmd: "pwd" })).pipe(
|
||||
Stream.tap((event) => (event.type === "tool-call" ? Effect.sync(() => ready.resolve()) : Effect.void)),
|
||||
),
|
||||
)
|
||||
yield* llm.toolHang("bash", { cmd: "pwd" })
|
||||
|
||||
const chat = yield* session.create({})
|
||||
const parent = yield* user(chat.id, "tool abort")
|
||||
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
|
||||
const mdl = model(100)
|
||||
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
|
||||
const handle = yield* processors.create({
|
||||
assistantMessage: msg,
|
||||
sessionID: chat.id,
|
||||
@@ -700,7 +581,15 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
|
||||
})
|
||||
.pipe(Effect.forkChild)
|
||||
|
||||
yield* Effect.promise(() => ready.promise)
|
||||
yield* llm.wait(1)
|
||||
yield* Effect.promise(async () => {
|
||||
const end = Date.now() + 500
|
||||
while (Date.now() < end) {
|
||||
const parts = await MessageV2.parts(msg.id)
|
||||
if (parts.some((part) => part.type === "tool")) return
|
||||
await Bun.sleep(10)
|
||||
}
|
||||
})
|
||||
yield* Fiber.interrupt(run)
|
||||
|
||||
const exit = yield* Fiber.await(run)
|
||||
@@ -708,45 +597,38 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
|
||||
yield* handle.abort()
|
||||
}
|
||||
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
|
||||
const tool = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
|
||||
const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
|
||||
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
if (Exit.isFailure(exit)) {
|
||||
expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
|
||||
}
|
||||
expect(yield* test.calls).toBe(1)
|
||||
expect(tool?.state.status).toBe("error")
|
||||
if (tool?.state.status === "error") {
|
||||
expect(tool.state.error).toBe("Tool execution aborted")
|
||||
expect(tool.state.time.end).toBeDefined()
|
||||
expect(yield* llm.calls).toBe(1)
|
||||
expect(call?.state.status).toBe("error")
|
||||
if (call?.state.status === "error") {
|
||||
expect(call.state.error).toBe("Tool execution aborted")
|
||||
expect(call.state.time.end).toBeDefined()
|
||||
}
|
||||
}),
|
||||
{ git: true },
|
||||
)
|
||||
})
|
||||
{ git: true, config: (url) => providerCfg(url) },
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("session.processor effect tests record aborted errors and idle state", () => {
|
||||
return provideTmpdirInstance(
|
||||
(dir) =>
|
||||
it.live("session.processor effect tests record aborted errors and idle state", () =>
|
||||
provideTmpdirServer(
|
||||
({ dir, llm }) =>
|
||||
Effect.gen(function* () {
|
||||
const ready = defer<void>()
|
||||
const seen = defer<void>()
|
||||
const test = yield* TestLLM
|
||||
const processors = yield* SessionProcessor.Service
|
||||
const session = yield* Session.Service
|
||||
const { processors, session, provider } = yield* boot()
|
||||
const bus = yield* Bus.Service
|
||||
const status = yield* SessionStatus.Service
|
||||
const sts = yield* SessionStatus.Service
|
||||
|
||||
yield* test.push((input) =>
|
||||
hang(input, start()).pipe(
|
||||
Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
|
||||
),
|
||||
)
|
||||
yield* llm.hang
|
||||
|
||||
const chat = yield* session.create({})
|
||||
const parent = yield* user(chat.id, "abort")
|
||||
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
|
||||
const mdl = model(100)
|
||||
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
|
||||
const errs: string[] = []
|
||||
const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => {
|
||||
if (evt.properties.sessionID !== chat.id) return
|
||||
@@ -779,7 +661,7 @@ it.effect("session.processor effect tests record aborted errors and idle state",
|
||||
})
|
||||
.pipe(Effect.forkChild)
|
||||
|
||||
yield* Effect.promise(() => ready.promise)
|
||||
yield* llm.wait(1)
|
||||
yield* Fiber.interrupt(run)
|
||||
|
||||
const exit = yield* Fiber.await(run)
|
||||
@@ -788,7 +670,7 @@ it.effect("session.processor effect tests record aborted errors and idle state",
|
||||
}
|
||||
yield* Effect.promise(() => seen.promise)
|
||||
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
|
||||
const state = yield* status.get(chat.id)
|
||||
const state = yield* sts.get(chat.id)
|
||||
off()
|
||||
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
@@ -803,30 +685,23 @@ it.effect("session.processor effect tests record aborted errors and idle state",
|
||||
expect(state).toMatchObject({ type: "idle" })
|
||||
expect(errs).toContain("MessageAbortedError")
|
||||
}),
|
||||
{ git: true },
|
||||
)
|
||||
})
|
||||
{ git: true, config: (url) => providerCfg(url) },
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("session.processor effect tests mark interruptions aborted without manual abort", () => {
|
||||
return provideTmpdirInstance(
|
||||
(dir) =>
|
||||
it.live("session.processor effect tests mark interruptions aborted without manual abort", () =>
|
||||
provideTmpdirServer(
|
||||
({ dir, llm }) =>
|
||||
Effect.gen(function* () {
|
||||
const ready = defer<void>()
|
||||
const processors = yield* SessionProcessor.Service
|
||||
const session = yield* Session.Service
|
||||
const status = yield* SessionStatus.Service
|
||||
const test = yield* TestLLM
|
||||
const { processors, session, provider } = yield* boot()
|
||||
const sts = yield* SessionStatus.Service
|
||||
|
||||
yield* test.push((input) =>
|
||||
hang(input, start()).pipe(
|
||||
Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
|
||||
),
|
||||
)
|
||||
yield* llm.hang
|
||||
|
||||
const chat = yield* session.create({})
|
||||
const parent = yield* user(chat.id, "interrupt")
|
||||
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
|
||||
const mdl = model(100)
|
||||
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
|
||||
const handle = yield* processors.create({
|
||||
assistantMessage: msg,
|
||||
sessionID: chat.id,
|
||||
@@ -852,12 +727,12 @@ it.effect("session.processor effect tests mark interruptions aborted without man
|
||||
})
|
||||
.pipe(Effect.forkChild)
|
||||
|
||||
yield* Effect.promise(() => ready.promise)
|
||||
yield* llm.wait(1)
|
||||
yield* Fiber.interrupt(run)
|
||||
|
||||
const exit = yield* Fiber.await(run)
|
||||
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
|
||||
const state = yield* status.get(chat.id)
|
||||
const state = yield* sts.get(chat.id)
|
||||
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
expect(handle.message.error?.name).toBe("MessageAbortedError")
|
||||
@@ -867,6 +742,6 @@ it.effect("session.processor effect tests mark interruptions aborted without man
|
||||
}
|
||||
expect(state).toMatchObject({ type: "idle" })
|
||||
}),
|
||||
{ git: true },
|
||||
)
|
||||
})
|
||||
{ git: true, config: (url) => providerCfg(url) },
|
||||
),
|
||||
)
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -140,7 +140,7 @@ describe("Truncate", () => {
|
||||
const DAY_MS = 24 * 60 * 60 * 1000
|
||||
const it = testEffect(Layer.mergeAll(TruncateSvc.defaultLayer, NodeFileSystem.layer))
|
||||
|
||||
it.effect("deletes files older than 7 days and preserves recent files", () =>
|
||||
it.live("deletes files older than 7 days and preserves recent files", () =>
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* FileSystem.FileSystem
|
||||
|
||||
|
||||
@@ -4,20 +4,6 @@ export type ClientOptions = {
|
||||
baseUrl: `${string}://${string}` | (string & {})
|
||||
}
|
||||
|
||||
export type EventInstallationUpdated = {
|
||||
type: "installation.updated"
|
||||
properties: {
|
||||
version: string
|
||||
}
|
||||
}
|
||||
|
||||
export type EventInstallationUpdateAvailable = {
|
||||
type: "installation.update-available"
|
||||
properties: {
|
||||
version: string
|
||||
}
|
||||
}
|
||||
|
||||
export type Project = {
|
||||
id: string
|
||||
worktree: string
|
||||
@@ -47,6 +33,20 @@ export type EventProjectUpdated = {
|
||||
properties: Project
|
||||
}
|
||||
|
||||
export type EventInstallationUpdated = {
|
||||
type: "installation.updated"
|
||||
properties: {
|
||||
version: string
|
||||
}
|
||||
}
|
||||
|
||||
export type EventInstallationUpdateAvailable = {
|
||||
type: "installation.update-available"
|
||||
properties: {
|
||||
version: string
|
||||
}
|
||||
}
|
||||
|
||||
export type EventServerInstanceDisposed = {
|
||||
type: "server.instance.disposed"
|
||||
properties: {
|
||||
@@ -964,9 +964,9 @@ export type EventSessionDeleted = {
|
||||
}
|
||||
|
||||
export type Event =
|
||||
| EventProjectUpdated
|
||||
| EventInstallationUpdated
|
||||
| EventInstallationUpdateAvailable
|
||||
| EventProjectUpdated
|
||||
| EventServerInstanceDisposed
|
||||
| EventServerConnected
|
||||
| EventGlobalDisposed
|
||||
|
||||
Reference in New Issue
Block a user