mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-08 06:54:53 +00:00
Compare commits
3 Commits
v1.3.17
...
opencode/g
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
221aa8c19e | ||
|
|
544d012515 | ||
|
|
ba41b6928f |
@@ -78,7 +78,7 @@ export namespace Agent {
|
||||
const provider = yield* Provider.Service
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("Agent.state")(function* (ctx) {
|
||||
Effect.fnUntraced(function* (ctx) {
|
||||
const cfg = yield* config.get()
|
||||
const skillDirs = yield* skill.dirs()
|
||||
const whitelistedDirs = [Truncate.GLOB, ...skillDirs.map((dir) => path.join(dir, "*"))]
|
||||
|
||||
@@ -47,7 +47,7 @@ export namespace Bus {
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("Bus.state")(function* (ctx) {
|
||||
Effect.fnUntraced(function* (ctx) {
|
||||
const wildcard = yield* PubSub.unbounded<Payload>()
|
||||
const typed = new Map<string, PubSub.PubSub<Payload>>()
|
||||
|
||||
|
||||
@@ -79,7 +79,7 @@ export namespace Command {
|
||||
const mcp = yield* MCP.Service
|
||||
const skill = yield* Skill.Service
|
||||
|
||||
const init = Effect.fn("Command.state")(function* (ctx) {
|
||||
const init = Effect.fnUntraced(function* (ctx) {
|
||||
const cfg = yield* config.get()
|
||||
const commands: Record<string, Info> = {}
|
||||
|
||||
|
||||
@@ -1475,7 +1475,7 @@ export namespace Config {
|
||||
})
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("Config.state")(function* (ctx) {
|
||||
Effect.fnUntraced(function* (ctx) {
|
||||
return yield* loadInstanceState(ctx)
|
||||
}),
|
||||
)
|
||||
|
||||
26
packages/opencode/src/effect/observability.ts
Normal file
26
packages/opencode/src/effect/observability.ts
Normal file
@@ -0,0 +1,26 @@
|
||||
import { Layer } from "effect"
|
||||
import { FetchHttpClient } from "effect/unstable/http"
|
||||
import { Otlp } from "effect/unstable/observability"
|
||||
import { Flag } from "@/flag/flag"
|
||||
import { CHANNEL, VERSION } from "@/installation/meta"
|
||||
|
||||
export namespace Observability {
|
||||
const base = Flag.OPENCODE_OTLP_BASE_URL?.trim() || undefined
|
||||
|
||||
export const enabled = !!base
|
||||
|
||||
export const layer = !base
|
||||
? Layer.empty
|
||||
: Otlp.layerJson({
|
||||
baseUrl: base,
|
||||
loggerMergeWithExisting: false,
|
||||
resource: {
|
||||
serviceName: "opencode",
|
||||
serviceVersion: VERSION,
|
||||
attributes: {
|
||||
"deployment.environment.name": CHANNEL === "local" ? "local" : CHANNEL,
|
||||
"opencode.client": Flag.OPENCODE_CLIENT,
|
||||
},
|
||||
},
|
||||
}).pipe(Layer.provide(FetchHttpClient.layer))
|
||||
}
|
||||
@@ -3,6 +3,7 @@ import * as ServiceMap from "effect/ServiceMap"
|
||||
import { Instance } from "@/project/instance"
|
||||
import { Context } from "@/util/context"
|
||||
import { InstanceRef } from "./instance-ref"
|
||||
import { Observability } from "./observability"
|
||||
|
||||
export const memoMap = Layer.makeMemoMapUnsafe()
|
||||
|
||||
@@ -18,7 +19,7 @@ function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R>
|
||||
|
||||
export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
|
||||
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
|
||||
const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap }))
|
||||
const getRuntime = () => (rt ??= ManagedRuntime.make(Layer.merge(layer, Observability.layer), { memoMap }))
|
||||
|
||||
return {
|
||||
runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(attach(service.use(fn))),
|
||||
|
||||
@@ -346,11 +346,11 @@ export namespace File {
|
||||
const appFs = yield* AppFileSystem.Service
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("File.state")(() =>
|
||||
Effect.succeed({
|
||||
Effect.fnUntraced(function* () {
|
||||
return {
|
||||
cache: { files: [], dirs: [] } as Entry,
|
||||
}),
|
||||
),
|
||||
}
|
||||
}),
|
||||
)
|
||||
|
||||
const scan = Effect.fn("File.scan")(function* () {
|
||||
|
||||
@@ -54,12 +54,12 @@ export namespace FileTime {
|
||||
}
|
||||
})
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("FileTime.state")(() =>
|
||||
Effect.succeed({
|
||||
Effect.fnUntraced(function* () {
|
||||
return {
|
||||
reads: new Map<SessionID, Map<string, Stamp>>(),
|
||||
locks: new Map<string, Semaphore.Semaphore>(),
|
||||
}),
|
||||
),
|
||||
}
|
||||
}),
|
||||
)
|
||||
|
||||
const getLock = Effect.fn("FileTime.lock")(function* (filepath: string) {
|
||||
|
||||
@@ -73,7 +73,7 @@ export namespace FileWatcher {
|
||||
const config = yield* Config.Service
|
||||
|
||||
const state = yield* InstanceState.make(
|
||||
Effect.fn("FileWatcher.state")(
|
||||
Effect.fnUntraced(
|
||||
function* () {
|
||||
if (yield* Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER) return
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ export namespace Flag {
|
||||
export const OPENCODE_SERVER_PASSWORD = process.env["OPENCODE_SERVER_PASSWORD"]
|
||||
export const OPENCODE_SERVER_USERNAME = process.env["OPENCODE_SERVER_USERNAME"]
|
||||
export const OPENCODE_ENABLE_QUESTION_TOOL = truthy("OPENCODE_ENABLE_QUESTION_TOOL")
|
||||
export const OPENCODE_OTLP_BASE_URL = process.env["OPENCODE_OTLP_BASE_URL"]
|
||||
|
||||
// Experimental
|
||||
export const OPENCODE_EXPERIMENTAL = truthy("OPENCODE_EXPERIMENTAL")
|
||||
|
||||
@@ -40,7 +40,7 @@ export namespace Format {
|
||||
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
|
||||
|
||||
const state = yield* InstanceState.make(
|
||||
Effect.fn("Format.state")(function* (_ctx) {
|
||||
Effect.fnUntraced(function* (_ctx) {
|
||||
const commands: Record<string, string[] | false> = {}
|
||||
const formatters: Record<string, Formatter.Info> = {}
|
||||
|
||||
@@ -84,57 +84,106 @@ export namespace Format {
|
||||
return cmd !== false
|
||||
}
|
||||
|
||||
async function getFormatter(ext: string) {
|
||||
const matching = Object.values(formatters).filter((item) => item.extensions.includes(ext))
|
||||
const checks = await Promise.all(
|
||||
matching.map(async (item) => {
|
||||
log.info("checking", { name: item.name, ext })
|
||||
const cmd = await getCommand(item)
|
||||
if (cmd) {
|
||||
log.info("enabled", { name: item.name, ext })
|
||||
}
|
||||
return {
|
||||
item,
|
||||
cmd,
|
||||
}
|
||||
}),
|
||||
)
|
||||
return checks.filter((x) => x.cmd).map((x) => ({ item: x.item, cmd: x.cmd! }))
|
||||
function check(item: Formatter.Info, ext: string) {
|
||||
return Effect.gen(function* () {
|
||||
yield* Effect.annotateCurrentSpan({
|
||||
ext,
|
||||
formatter: item.name,
|
||||
})
|
||||
log.info("checking", { name: item.name, ext })
|
||||
const cmd = yield* Effect.promise(() => getCommand(item))
|
||||
if (cmd) {
|
||||
log.info("enabled", { name: item.name, ext })
|
||||
}
|
||||
yield* Effect.annotateCurrentSpan({ enabled: !!cmd })
|
||||
return {
|
||||
item,
|
||||
cmd,
|
||||
}
|
||||
}).pipe(Effect.withSpan("Format.checkFormatter"))
|
||||
}
|
||||
|
||||
function formatFile(filepath: string) {
|
||||
function resolve(ext: string) {
|
||||
return Effect.gen(function* () {
|
||||
const matching = Object.values(formatters).filter((item) => item.extensions.includes(ext))
|
||||
const checks = yield* Effect.all(matching.map((item) => check(item, ext)))
|
||||
const enabled = checks.filter((item) => item.cmd).map((item) => ({ item: item.item, cmd: item.cmd! }))
|
||||
yield* Effect.annotateCurrentSpan({
|
||||
ext,
|
||||
matched_formatters: matching.map((item) => item.name).join(",") || "none",
|
||||
enabled_formatters: enabled.map((item) => item.item.name).join(",") || "none",
|
||||
})
|
||||
return {
|
||||
matching,
|
||||
enabled,
|
||||
}
|
||||
}).pipe(Effect.withSpan("Format.resolveFormatters"))
|
||||
}
|
||||
|
||||
function spawn(item: Formatter.Info, command: string[], filepath: string) {
|
||||
return Effect.gen(function* () {
|
||||
const dir = yield* InstanceState.directory
|
||||
yield* Effect.annotateCurrentSpan({
|
||||
file: filepath,
|
||||
formatter: item.name,
|
||||
command: command.join(" "),
|
||||
})
|
||||
return yield* spawner.spawn(
|
||||
ChildProcess.make(command[0]!, command.slice(1), {
|
||||
cwd: dir,
|
||||
env: item.environment,
|
||||
extendEnv: true,
|
||||
}),
|
||||
)
|
||||
}).pipe(Effect.withSpan("Format.spawnFormatter"))
|
||||
}
|
||||
|
||||
function wait(
|
||||
handle: ChildProcessSpawner.ChildProcessHandle,
|
||||
item: Formatter.Info,
|
||||
command: string[],
|
||||
filepath: string,
|
||||
) {
|
||||
return Effect.gen(function* () {
|
||||
yield* Effect.annotateCurrentSpan({
|
||||
file: filepath,
|
||||
formatter: item.name,
|
||||
command: command.join(" "),
|
||||
})
|
||||
return yield* handle.exitCode
|
||||
}).pipe(Effect.withSpan("Format.waitFormatter"))
|
||||
}
|
||||
|
||||
function formatFile(filepath: string): Effect.Effect<void, never, never> {
|
||||
return Effect.gen(function* () {
|
||||
log.info("formatting", { file: filepath })
|
||||
const ext = path.extname(filepath)
|
||||
yield* Effect.annotateCurrentSpan({ file: filepath, ext })
|
||||
const fmt = yield* resolve(ext)
|
||||
yield* Effect.annotateCurrentSpan({
|
||||
matched_formatters: fmt.matching.map((item) => item.name).join(",") || "none",
|
||||
enabled_formatters: fmt.enabled.map((item) => item.item.name).join(",") || "none",
|
||||
})
|
||||
|
||||
for (const { item, cmd } of yield* Effect.promise(() => getFormatter(ext))) {
|
||||
for (const { item, cmd } of fmt.enabled) {
|
||||
if (cmd === false) continue
|
||||
log.info("running", { command: cmd })
|
||||
const replaced = cmd.map((x) => x.replace("$FILE", filepath))
|
||||
const dir = yield* InstanceState.directory
|
||||
const code = yield* spawner
|
||||
.spawn(
|
||||
ChildProcess.make(replaced[0]!, replaced.slice(1), {
|
||||
cwd: dir,
|
||||
env: item.environment,
|
||||
extendEnv: true,
|
||||
const code = yield* spawn(item, replaced, filepath).pipe(
|
||||
Effect.flatMap((handle) => wait(handle, item, replaced, filepath)),
|
||||
Effect.scoped,
|
||||
Effect.catch(() =>
|
||||
Effect.sync(() => {
|
||||
log.error("failed to format file", {
|
||||
error: "spawn failed",
|
||||
command: replaced,
|
||||
...item.environment,
|
||||
file: filepath,
|
||||
})
|
||||
return ChildProcessSpawner.ExitCode(1)
|
||||
}),
|
||||
)
|
||||
.pipe(
|
||||
Effect.flatMap((handle) => handle.exitCode),
|
||||
Effect.scoped,
|
||||
Effect.catch(() =>
|
||||
Effect.sync(() => {
|
||||
log.error("failed to format file", {
|
||||
error: "spawn failed",
|
||||
command: cmd,
|
||||
...item.environment,
|
||||
file: filepath,
|
||||
})
|
||||
return ChildProcessSpawner.ExitCode(1)
|
||||
}),
|
||||
),
|
||||
)
|
||||
),
|
||||
)
|
||||
if (code !== 0) {
|
||||
log.error("failed", {
|
||||
command: cmd,
|
||||
|
||||
@@ -164,7 +164,7 @@ export namespace LSP {
|
||||
const config = yield* Config.Service
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("LSP.state")(function* () {
|
||||
Effect.fnUntraced(function* () {
|
||||
const cfg = yield* config.get()
|
||||
|
||||
const servers: Record<string, LSPServer.Info> = {}
|
||||
|
||||
@@ -478,7 +478,7 @@ export namespace MCP {
|
||||
}
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("MCP.state")(function* () {
|
||||
Effect.fnUntraced(function* () {
|
||||
const cfg = yield* cfgSvc.get()
|
||||
const config = cfg.mcp ?? {}
|
||||
const s: State = {
|
||||
|
||||
@@ -142,7 +142,7 @@ export namespace Permission {
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("Permission.state")(function* (ctx) {
|
||||
Effect.fnUntraced(function* (ctx) {
|
||||
const row = Database.use((db) =>
|
||||
db.select().from(PermissionTable).where(eq(PermissionTable.project_id, ctx.project.id)).get(),
|
||||
)
|
||||
|
||||
@@ -98,7 +98,7 @@ export namespace Plugin {
|
||||
const config = yield* Config.Service
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("Plugin.state")(function* (ctx) {
|
||||
Effect.fnUntraced(function* (ctx) {
|
||||
const hooks: Hooks[] = []
|
||||
|
||||
const { Server } = yield* Effect.promise(() => import("../server/server"))
|
||||
|
||||
@@ -147,39 +147,37 @@ export namespace Vcs {
|
||||
const bus = yield* Bus.Service
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("Vcs.state")((ctx) =>
|
||||
Effect.gen(function* () {
|
||||
if (ctx.project.vcs !== "git") {
|
||||
return { current: undefined, root: undefined }
|
||||
}
|
||||
Effect.fnUntraced(function* (ctx) {
|
||||
if (ctx.project.vcs !== "git") {
|
||||
return { current: undefined, root: undefined }
|
||||
}
|
||||
|
||||
const get = Effect.fnUntraced(function* () {
|
||||
return yield* git.branch(ctx.directory)
|
||||
})
|
||||
const [current, root] = yield* Effect.all([git.branch(ctx.directory), git.defaultBranch(ctx.directory)], {
|
||||
concurrency: 2,
|
||||
})
|
||||
const value = { current, root }
|
||||
log.info("initialized", { branch: value.current, default_branch: value.root?.name })
|
||||
const get = Effect.fnUntraced(function* () {
|
||||
return yield* git.branch(ctx.directory)
|
||||
})
|
||||
const [current, root] = yield* Effect.all([git.branch(ctx.directory), git.defaultBranch(ctx.directory)], {
|
||||
concurrency: 2,
|
||||
})
|
||||
const value = { current, root }
|
||||
log.info("initialized", { branch: value.current, default_branch: value.root?.name })
|
||||
|
||||
yield* bus.subscribe(FileWatcher.Event.Updated).pipe(
|
||||
Stream.filter((evt) => evt.properties.file.endsWith("HEAD")),
|
||||
Stream.runForEach((_evt) =>
|
||||
Effect.gen(function* () {
|
||||
const next = yield* get()
|
||||
if (next !== value.current) {
|
||||
log.info("branch changed", { from: value.current, to: next })
|
||||
value.current = next
|
||||
yield* bus.publish(Event.BranchUpdated, { branch: next })
|
||||
}
|
||||
}),
|
||||
),
|
||||
Effect.forkScoped,
|
||||
)
|
||||
yield* bus.subscribe(FileWatcher.Event.Updated).pipe(
|
||||
Stream.filter((evt) => evt.properties.file.endsWith("HEAD")),
|
||||
Stream.runForEach((_evt) =>
|
||||
Effect.gen(function* () {
|
||||
const next = yield* get()
|
||||
if (next !== value.current) {
|
||||
log.info("branch changed", { from: value.current, to: next })
|
||||
value.current = next
|
||||
yield* bus.publish(Event.BranchUpdated, { branch: next })
|
||||
}
|
||||
}),
|
||||
),
|
||||
Effect.forkScoped,
|
||||
)
|
||||
|
||||
return value
|
||||
}),
|
||||
),
|
||||
return value
|
||||
}),
|
||||
)
|
||||
|
||||
return Service.of({
|
||||
|
||||
@@ -117,7 +117,7 @@ export namespace ProviderAuth {
|
||||
const auth = yield* Auth.Service
|
||||
const plugin = yield* Plugin.Service
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("ProviderAuth.state")(function* () {
|
||||
Effect.fnUntraced(function* () {
|
||||
const plugins = yield* plugin.list()
|
||||
return {
|
||||
hooks: Record.fromEntries(
|
||||
|
||||
@@ -133,7 +133,7 @@ export namespace Pty {
|
||||
}
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("Pty.state")(function* (ctx) {
|
||||
Effect.fnUntraced(function* (ctx) {
|
||||
const state = {
|
||||
dir: ctx.directory,
|
||||
sessions: new Map<PtyID, Active>(),
|
||||
|
||||
@@ -111,7 +111,7 @@ export namespace Question {
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("Question.state")(function* () {
|
||||
Effect.fnUntraced(function* () {
|
||||
const state = {
|
||||
pending: new Map<QuestionID, PendingEntry>(),
|
||||
}
|
||||
|
||||
@@ -138,7 +138,7 @@ export namespace SessionCompaction {
|
||||
}
|
||||
})
|
||||
|
||||
const processCompaction = Effect.fn("SessionCompaction.process")(function* (input: {
|
||||
const process = Effect.fn("SessionCompaction.process")(function* (input: {
|
||||
parentID: MessageID
|
||||
messages: MessageV2.WithParts[]
|
||||
sessionID: SessionID
|
||||
@@ -374,7 +374,7 @@ When constructing the summary, try to stick to this template:
|
||||
return Service.of({
|
||||
isOverflow,
|
||||
prune,
|
||||
process: processCompaction,
|
||||
process,
|
||||
create,
|
||||
})
|
||||
}),
|
||||
|
||||
@@ -75,12 +75,12 @@ export namespace Instruction {
|
||||
const http = HttpClient.filterStatusOk(withTransientReadRetry(yield* HttpClient.HttpClient))
|
||||
|
||||
const state = yield* InstanceState.make(
|
||||
Effect.fn("Instruction.state")(() =>
|
||||
Effect.succeed({
|
||||
Effect.fnUntraced(function* () {
|
||||
return {
|
||||
// Track which instruction files have already been attached for a given assistant message.
|
||||
claims: new Map<MessageID, Set<string>>(),
|
||||
}),
|
||||
),
|
||||
}
|
||||
}),
|
||||
)
|
||||
|
||||
const relative = Effect.fnUntraced(function* (instruction: string) {
|
||||
|
||||
@@ -415,9 +415,20 @@ export namespace SessionProcessor {
|
||||
|
||||
const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) {
|
||||
log.error("process", { error: e, stack: e instanceof Error ? e.stack : undefined })
|
||||
yield* Effect.logError("session processor failed", {
|
||||
agent: ctx.assistantMessage.agent,
|
||||
modelID: ctx.model.id,
|
||||
providerID: ctx.model.providerID,
|
||||
sessionID: ctx.sessionID,
|
||||
})
|
||||
const error = parse(e)
|
||||
if (MessageV2.ContextOverflowError.isInstance(error)) {
|
||||
ctx.needsCompaction = true
|
||||
yield* Effect.logWarning("session processor requested compaction", {
|
||||
modelID: ctx.model.id,
|
||||
providerID: ctx.model.providerID,
|
||||
sessionID: ctx.sessionID,
|
||||
})
|
||||
yield* bus.publish(Session.Event.Error, { sessionID: ctx.sessionID, error })
|
||||
return
|
||||
}
|
||||
@@ -446,6 +457,18 @@ export namespace SessionProcessor {
|
||||
log.info("process")
|
||||
ctx.needsCompaction = false
|
||||
ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
|
||||
yield* Effect.annotateCurrentSpan({
|
||||
agent: streamInput.agent.name,
|
||||
modelID: streamInput.model.id,
|
||||
providerID: streamInput.model.providerID,
|
||||
sessionID: ctx.sessionID,
|
||||
})
|
||||
yield* Effect.logInfo("session processor started", {
|
||||
agent: streamInput.agent.name,
|
||||
modelID: streamInput.model.id,
|
||||
providerID: streamInput.model.providerID,
|
||||
sessionID: ctx.sessionID,
|
||||
})
|
||||
|
||||
return yield* Effect.gen(function* () {
|
||||
yield* Effect.gen(function* () {
|
||||
@@ -459,6 +482,7 @@ export namespace SessionProcessor {
|
||||
Stream.runDrain,
|
||||
)
|
||||
}).pipe(
|
||||
Effect.withSpan("SessionProcessor.stream"),
|
||||
Effect.onInterrupt(() => Effect.sync(() => void (aborted = true))),
|
||||
Effect.catchCauseIf(
|
||||
(cause) => !Cause.hasInterruptsOnly(cause),
|
||||
@@ -483,6 +507,12 @@ export namespace SessionProcessor {
|
||||
if (aborted && !ctx.assistantMessage.error) {
|
||||
yield* abort()
|
||||
}
|
||||
yield* Effect.logInfo("session processor finished", {
|
||||
aborted,
|
||||
blocked: ctx.blocked,
|
||||
compact: ctx.needsCompaction,
|
||||
sessionID: ctx.sessionID,
|
||||
})
|
||||
if (ctx.needsCompaction) return "compact"
|
||||
if (ctx.blocked || ctx.assistantMessage.error || aborted) return "stop"
|
||||
return "continue"
|
||||
|
||||
@@ -103,7 +103,7 @@ export namespace SessionPrompt {
|
||||
const instruction = yield* Instruction.Service
|
||||
|
||||
const state = yield* InstanceState.make(
|
||||
Effect.fn("SessionPrompt.state")(function* () {
|
||||
Effect.fnUntraced(function* () {
|
||||
const runners = new Map<string, Runner<MessageV2.WithParts>>()
|
||||
yield* Effect.addFinalizer(
|
||||
Effect.fnUntraced(function* () {
|
||||
@@ -1340,12 +1340,15 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
let structured: unknown | undefined
|
||||
let step = 0
|
||||
const session = yield* sessions.get(sessionID)
|
||||
yield* Effect.annotateCurrentSpan({ sessionID })
|
||||
|
||||
while (true) {
|
||||
yield* status.set(sessionID, { type: "busy" })
|
||||
log.info("loop", { step, sessionID })
|
||||
|
||||
let msgs = yield* MessageV2.filterCompactedEffect(sessionID)
|
||||
let msgs = yield* MessageV2.filterCompactedEffect(sessionID).pipe(
|
||||
Effect.withSpan("SessionPrompt.loadMessages"),
|
||||
)
|
||||
|
||||
let lastUser: MessageV2.User | undefined
|
||||
let lastAssistant: MessageV2.Assistant | undefined
|
||||
@@ -1398,13 +1401,20 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
}
|
||||
|
||||
if (task?.type === "compaction") {
|
||||
const result = yield* compaction.process({
|
||||
messages: msgs,
|
||||
parentID: lastUser.id,
|
||||
sessionID,
|
||||
yield* Effect.logWarning("session compaction task", {
|
||||
auto: task.auto,
|
||||
overflow: task.overflow,
|
||||
sessionID,
|
||||
})
|
||||
const result = yield* compaction
|
||||
.process({
|
||||
messages: msgs,
|
||||
parentID: lastUser.id,
|
||||
sessionID,
|
||||
auto: task.auto,
|
||||
overflow: task.overflow,
|
||||
})
|
||||
.pipe(Effect.withSpan("SessionPrompt.compaction"))
|
||||
if (result === "stop") break
|
||||
continue
|
||||
}
|
||||
@@ -1414,6 +1424,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
lastFinished.summary !== true &&
|
||||
(yield* compaction.isOverflow({ tokens: lastFinished.tokens, model }))
|
||||
) {
|
||||
yield* Effect.logWarning("session overflow detected", { modelID: model.id, sessionID, step })
|
||||
yield* compaction.create({ sessionID, agent: lastUser.agent, model: lastUser.model, auto: true })
|
||||
continue
|
||||
}
|
||||
@@ -1429,6 +1440,13 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
const maxSteps = agent.steps ?? Infinity
|
||||
const isLastStep = step >= maxSteps
|
||||
msgs = yield* insertReminders({ messages: msgs, agent, session })
|
||||
yield* Effect.logInfo("session turn", {
|
||||
agent: agent.name,
|
||||
modelID: model.id,
|
||||
providerID: model.providerID,
|
||||
sessionID,
|
||||
step,
|
||||
})
|
||||
|
||||
const msg: MessageV2.Assistant = {
|
||||
id: MessageID.ascending(),
|
||||
@@ -1503,7 +1521,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
Effect.promise(() => SystemPrompt.environment(model)),
|
||||
instruction.system().pipe(Effect.orDie),
|
||||
Effect.promise(() => MessageV2.toModelMessages(msgs, model)),
|
||||
])
|
||||
]).pipe(Effect.withSpan("SessionPrompt.buildInput"))
|
||||
const system = [...env, ...(skills ? [skills] : []), ...instructions]
|
||||
const format = lastUser.format ?? { type: "text" as const }
|
||||
if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)
|
||||
|
||||
@@ -58,7 +58,9 @@ export namespace SessionStatus {
|
||||
const bus = yield* Bus.Service
|
||||
|
||||
const state = yield* InstanceState.make(
|
||||
Effect.fn("SessionStatus.state")(() => Effect.succeed(new Map<SessionID, Info>())),
|
||||
Effect.fnUntraced(function* () {
|
||||
return new Map<SessionID, Info>()
|
||||
}),
|
||||
)
|
||||
|
||||
const get = Effect.fn("SessionStatus.get")(function* (sessionID: SessionID) {
|
||||
|
||||
@@ -99,8 +99,8 @@ export namespace SessionSummary {
|
||||
if (part.type === "step-finish" && part.snapshot) to = part.snapshot
|
||||
}
|
||||
}
|
||||
if (from && to) return yield* snapshot.diffFull(from, to)
|
||||
return []
|
||||
if (!from || !to || from === to) return []
|
||||
return yield* snapshot.diffFull(from, to)
|
||||
})
|
||||
|
||||
const summarize = Effect.fn("SessionSummary.summarize")(function* (input: {
|
||||
|
||||
@@ -144,7 +144,7 @@ export namespace ShareNext {
|
||||
}
|
||||
|
||||
const state: InstanceState<State> = yield* InstanceState.make<State>(
|
||||
Effect.fn("ShareNext.state")(function* (_ctx) {
|
||||
Effect.fnUntraced(function* (_ctx) {
|
||||
const cache: State = { queue: new Map(), scope: yield* Scope.make() }
|
||||
|
||||
yield* Effect.addFinalizer(() =>
|
||||
|
||||
@@ -197,7 +197,7 @@ export namespace Skill {
|
||||
const bus = yield* Bus.Service
|
||||
const fsys = yield* AppFileSystem.Service
|
||||
const state = yield* InstanceState.make(
|
||||
Effect.fn("Skill.state")(function* (ctx) {
|
||||
Effect.fnUntraced(function* (ctx) {
|
||||
const s: State = { skills: {}, dirs: new Set() }
|
||||
yield* loadSkills(s, config, discovery, bus, fsys, ctx.directory, ctx.worktree)
|
||||
return s
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import { NodeFileSystem, NodePath } from "@effect/platform-node"
|
||||
import { Cause, Duration, Effect, Layer, Schedule, Semaphore, ServiceMap, Stream } from "effect"
|
||||
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
||||
import path from "path"
|
||||
@@ -82,7 +81,7 @@ export namespace Snapshot {
|
||||
}
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("Snapshot.state")(function* (ctx) {
|
||||
Effect.fnUntraced(function* (ctx) {
|
||||
const state = {
|
||||
directory: ctx.directory,
|
||||
worktree: ctx.worktree,
|
||||
@@ -150,7 +149,7 @@ export namespace Snapshot {
|
||||
yield* fs.writeFileString(target, text ? `${text}\n` : "").pipe(Effect.orDie)
|
||||
})
|
||||
|
||||
const add = Effect.fnUntraced(function* () {
|
||||
const add = Effect.fn("Snapshot.add")(function* () {
|
||||
yield* sync()
|
||||
const [diff, other] = yield* Effect.all(
|
||||
[
|
||||
@@ -203,7 +202,7 @@ export namespace Snapshot {
|
||||
}
|
||||
})
|
||||
|
||||
const cleanup = Effect.fnUntraced(function* () {
|
||||
const cleanup = Effect.fn("Snapshot.cleanup")(function* () {
|
||||
return yield* locked(
|
||||
Effect.gen(function* () {
|
||||
if (!(yield* enabled())) return
|
||||
@@ -221,7 +220,7 @@ export namespace Snapshot {
|
||||
)
|
||||
})
|
||||
|
||||
const track = Effect.fnUntraced(function* () {
|
||||
const track = Effect.fn("Snapshot.track")(function* () {
|
||||
return yield* locked(
|
||||
Effect.gen(function* () {
|
||||
if (!(yield* enabled())) return
|
||||
@@ -238,7 +237,9 @@ export namespace Snapshot {
|
||||
log.info("initialized")
|
||||
}
|
||||
yield* add()
|
||||
const result = yield* git(args(["write-tree"]), { cwd: state.directory })
|
||||
const result = yield* git(args(["write-tree"]), { cwd: state.directory }).pipe(
|
||||
Effect.withSpan("Snapshot.writeTree"),
|
||||
)
|
||||
const hash = result.text.trim()
|
||||
log.info("tracking", { hash, cwd: state.directory, git: state.gitdir })
|
||||
return hash
|
||||
@@ -246,7 +247,7 @@ export namespace Snapshot {
|
||||
)
|
||||
})
|
||||
|
||||
const patch = Effect.fnUntraced(function* (hash: string) {
|
||||
const patch = Effect.fn("Snapshot.patch")(function* (hash: string) {
|
||||
return yield* locked(
|
||||
Effect.gen(function* () {
|
||||
yield* add()
|
||||
@@ -273,7 +274,7 @@ export namespace Snapshot {
|
||||
)
|
||||
})
|
||||
|
||||
const restore = Effect.fnUntraced(function* (snapshot: string) {
|
||||
const restore = Effect.fn("Snapshot.restore")(function* (snapshot: string) {
|
||||
return yield* locked(
|
||||
Effect.gen(function* () {
|
||||
log.info("restore", { commit: snapshot })
|
||||
@@ -299,7 +300,7 @@ export namespace Snapshot {
|
||||
)
|
||||
})
|
||||
|
||||
const revert = Effect.fnUntraced(function* (patches: Snapshot.Patch[]) {
|
||||
const revert = Effect.fn("Snapshot.revert")(function* (patches: Snapshot.Patch[]) {
|
||||
return yield* locked(
|
||||
Effect.gen(function* () {
|
||||
const ops: { hash: string; file: string; rel: string }[] = []
|
||||
@@ -414,7 +415,7 @@ export namespace Snapshot {
|
||||
)
|
||||
})
|
||||
|
||||
const diff = Effect.fnUntraced(function* (hash: string) {
|
||||
const diff = Effect.fn("Snapshot.diff")(function* (hash: string) {
|
||||
return yield* locked(
|
||||
Effect.gen(function* () {
|
||||
yield* add()
|
||||
@@ -434,7 +435,7 @@ export namespace Snapshot {
|
||||
)
|
||||
})
|
||||
|
||||
const diffFull = Effect.fnUntraced(function* (from: string, to: string) {
|
||||
const diffFull = Effect.fn("Snapshot.diffFull")(function* (from: string, to: string) {
|
||||
return yield* locked(
|
||||
Effect.gen(function* () {
|
||||
type Row = {
|
||||
@@ -451,7 +452,7 @@ export namespace Snapshot {
|
||||
ref: string
|
||||
}
|
||||
|
||||
const show = Effect.fnUntraced(function* (row: Row) {
|
||||
const show = Effect.fn("Snapshot.show")(function* (row: Row) {
|
||||
if (row.binary) return ["", ""]
|
||||
if (row.status === "added") {
|
||||
return [
|
||||
@@ -478,7 +479,7 @@ export namespace Snapshot {
|
||||
)
|
||||
})
|
||||
|
||||
const load = Effect.fnUntraced(
|
||||
const load = Effect.fn("Snapshot.load")(
|
||||
function* (rows: Row[]) {
|
||||
const refs = rows.flatMap((row) => {
|
||||
if (row.binary) return []
|
||||
@@ -583,7 +584,7 @@ export namespace Snapshot {
|
||||
const statuses = yield* git(
|
||||
[...quote, ...args(["diff", "--no-ext-diff", "--name-status", "--no-renames", from, to, "--", "."])],
|
||||
{ cwd: state.directory },
|
||||
)
|
||||
).pipe(Effect.withSpan("Snapshot.diffStatus"))
|
||||
|
||||
for (const line of statuses.text.trim().split("\n")) {
|
||||
if (!line) continue
|
||||
@@ -597,7 +598,7 @@ export namespace Snapshot {
|
||||
{
|
||||
cwd: state.directory,
|
||||
},
|
||||
)
|
||||
).pipe(Effect.withSpan("Snapshot.diffNumstat"))
|
||||
|
||||
const rows = numstat.text
|
||||
.trim()
|
||||
@@ -660,30 +661,14 @@ export namespace Snapshot {
|
||||
)
|
||||
|
||||
return Service.of({
|
||||
init: Effect.fn("Snapshot.init")(function* () {
|
||||
yield* InstanceState.get(state)
|
||||
}),
|
||||
cleanup: Effect.fn("Snapshot.cleanup")(function* () {
|
||||
return yield* InstanceState.useEffect(state, (s) => s.cleanup())
|
||||
}),
|
||||
track: Effect.fn("Snapshot.track")(function* () {
|
||||
return yield* InstanceState.useEffect(state, (s) => s.track())
|
||||
}),
|
||||
patch: Effect.fn("Snapshot.patch")(function* (hash: string) {
|
||||
return yield* InstanceState.useEffect(state, (s) => s.patch(hash))
|
||||
}),
|
||||
restore: Effect.fn("Snapshot.restore")(function* (snapshot: string) {
|
||||
return yield* InstanceState.useEffect(state, (s) => s.restore(snapshot))
|
||||
}),
|
||||
revert: Effect.fn("Snapshot.revert")(function* (patches: Snapshot.Patch[]) {
|
||||
return yield* InstanceState.useEffect(state, (s) => s.revert(patches))
|
||||
}),
|
||||
diff: Effect.fn("Snapshot.diff")(function* (hash: string) {
|
||||
return yield* InstanceState.useEffect(state, (s) => s.diff(hash))
|
||||
}),
|
||||
diffFull: Effect.fn("Snapshot.diffFull")(function* (from: string, to: string) {
|
||||
return yield* InstanceState.useEffect(state, (s) => s.diffFull(from, to))
|
||||
}),
|
||||
init: () => InstanceState.get(state).pipe(Effect.asVoid),
|
||||
cleanup: () => InstanceState.useEffect(state, (s) => s.cleanup()),
|
||||
track: () => InstanceState.useEffect(state, (s) => s.track()),
|
||||
patch: (hash: string) => InstanceState.useEffect(state, (s) => s.patch(hash)),
|
||||
restore: (snapshot: string) => InstanceState.useEffect(state, (s) => s.restore(snapshot)),
|
||||
revert: (patches: Snapshot.Patch[]) => InstanceState.useEffect(state, (s) => s.revert(patches)),
|
||||
diff: (hash: string) => InstanceState.useEffect(state, (s) => s.diff(hash)),
|
||||
diffFull: (from: string, to: string) => InstanceState.useEffect(state, (s) => s.diffFull(from, to)),
|
||||
})
|
||||
}),
|
||||
)
|
||||
|
||||
@@ -82,7 +82,7 @@ export namespace ToolRegistry {
|
||||
Effect.isEffect(tool) ? tool : Effect.succeed(tool)
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("ToolRegistry.state")(function* (ctx) {
|
||||
Effect.fnUntraced(function* (ctx) {
|
||||
const custom: Tool.Info[] = []
|
||||
|
||||
function fromPlugin(id: string, def: ToolDefinition): Tool.Info {
|
||||
|
||||
@@ -33,6 +33,7 @@ process.env["XDG_DATA_HOME"] = path.join(dir, "share")
|
||||
process.env["XDG_CACHE_HOME"] = path.join(dir, "cache")
|
||||
process.env["XDG_CONFIG_HOME"] = path.join(dir, "config")
|
||||
process.env["XDG_STATE_HOME"] = path.join(dir, "state")
|
||||
delete process.env["OPENCODE_OTLP_BASE_URL"]
|
||||
process.env["OPENCODE_MODELS_PATH"] = path.join(import.meta.dir, "tool", "fixtures", "models-api.json")
|
||||
|
||||
// Set test home directory to isolate tests from user's actual home directory
|
||||
|
||||
Reference in New Issue
Block a user