mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-30 09:46:35 +00:00
refactor: add Effect logger for motel observability (#21954)
This commit is contained in:
67
packages/opencode/src/effect/logger.ts
Normal file
67
packages/opencode/src/effect/logger.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import { Cause, Effect, Logger, References } from "effect"
|
||||
import { Log } from "@/util/log"
|
||||
|
||||
export namespace EffectLogger {
|
||||
type Fields = Record<string, unknown>
|
||||
|
||||
export interface Handle {
|
||||
readonly debug: (msg?: unknown, extra?: Fields) => Effect.Effect<void>
|
||||
readonly info: (msg?: unknown, extra?: Fields) => Effect.Effect<void>
|
||||
readonly warn: (msg?: unknown, extra?: Fields) => Effect.Effect<void>
|
||||
readonly error: (msg?: unknown, extra?: Fields) => Effect.Effect<void>
|
||||
readonly with: (extra: Fields) => Handle
|
||||
}
|
||||
|
||||
const clean = (input?: Fields): Fields =>
|
||||
Object.fromEntries(Object.entries(input ?? {}).filter((entry) => entry[1] !== undefined && entry[1] !== null))
|
||||
|
||||
const text = (input: unknown): string => {
|
||||
if (Array.isArray(input)) return input.map((item) => String(item)).join(" ")
|
||||
return input === undefined ? "" : String(input)
|
||||
}
|
||||
|
||||
const call = (run: (msg?: unknown) => Effect.Effect<void>, base: Fields, msg?: unknown, extra?: Fields) => {
|
||||
const ann = clean({ ...base, ...extra })
|
||||
const fx = run(msg)
|
||||
return Object.keys(ann).length ? Effect.annotateLogs(fx, ann) : fx
|
||||
}
|
||||
|
||||
export const logger = Logger.make((opts) => {
|
||||
const extra = clean(opts.fiber.getRef(References.CurrentLogAnnotations))
|
||||
const now = opts.date.getTime()
|
||||
for (const [key, start] of opts.fiber.getRef(References.CurrentLogSpans)) {
|
||||
extra[`logSpan.${key}`] = `${now - start}ms`
|
||||
}
|
||||
if (opts.cause.reasons.length > 0) {
|
||||
extra.cause = Cause.pretty(opts.cause)
|
||||
}
|
||||
|
||||
const svc = typeof extra.service === "string" ? extra.service : undefined
|
||||
if (svc) delete extra.service
|
||||
const log = svc ? Log.create({ service: svc }) : Log.Default
|
||||
const msg = text(opts.message)
|
||||
|
||||
switch (opts.logLevel) {
|
||||
case "Trace":
|
||||
case "Debug":
|
||||
return log.debug(msg, extra)
|
||||
case "Warn":
|
||||
return log.warn(msg, extra)
|
||||
case "Error":
|
||||
case "Fatal":
|
||||
return log.error(msg, extra)
|
||||
default:
|
||||
return log.info(msg, extra)
|
||||
}
|
||||
})
|
||||
|
||||
export const layer = Logger.layer([logger], { mergeWithExisting: true })
|
||||
|
||||
export const create = (base: Fields = {}): Handle => ({
|
||||
debug: (msg, extra) => call((item) => Effect.logDebug(item), base, msg, extra),
|
||||
info: (msg, extra) => call((item) => Effect.logInfo(item), base, msg, extra),
|
||||
warn: (msg, extra) => call((item) => Effect.logWarning(item), base, msg, extra),
|
||||
error: (msg, extra) => call((item) => Effect.logError(item), base, msg, extra),
|
||||
with: (extra) => create({ ...base, ...extra }),
|
||||
})
|
||||
}
|
||||
@@ -1,34 +1,45 @@
|
||||
import { Layer } from "effect"
|
||||
import { Duration, Layer } from "effect"
|
||||
import { FetchHttpClient } from "effect/unstable/http"
|
||||
import { Otlp } from "effect/unstable/observability"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { Flag } from "@/flag/flag"
|
||||
import { CHANNEL, VERSION } from "@/installation/meta"
|
||||
|
||||
export namespace Observability {
|
||||
export const enabled = !!Flag.OTEL_EXPORTER_OTLP_ENDPOINT
|
||||
|
||||
export const layer = !Flag.OTEL_EXPORTER_OTLP_ENDPOINT
|
||||
? Layer.empty
|
||||
: Otlp.layerJson({
|
||||
baseUrl: Flag.OTEL_EXPORTER_OTLP_ENDPOINT,
|
||||
loggerMergeWithExisting: false,
|
||||
resource: {
|
||||
serviceName: "opencode",
|
||||
serviceVersion: VERSION,
|
||||
attributes: {
|
||||
"deployment.environment.name": CHANNEL === "local" ? "local" : CHANNEL,
|
||||
"opencode.client": Flag.OPENCODE_CLIENT,
|
||||
},
|
||||
const base = Flag.OTEL_EXPORTER_OTLP_ENDPOINT
|
||||
|
||||
const resource = {
|
||||
serviceName: "opencode",
|
||||
serviceVersion: VERSION,
|
||||
attributes: {
|
||||
"deployment.environment.name": CHANNEL === "local" ? "local" : CHANNEL,
|
||||
"opencode.client": Flag.OPENCODE_CLIENT,
|
||||
},
|
||||
}
|
||||
|
||||
const headers = Flag.OTEL_EXPORTER_OTLP_HEADERS
|
||||
? Flag.OTEL_EXPORTER_OTLP_HEADERS.split(",").reduce(
|
||||
(acc, x) => {
|
||||
const [key, value] = x.split("=")
|
||||
acc[key] = value
|
||||
return acc
|
||||
},
|
||||
headers: Flag.OTEL_EXPORTER_OTLP_HEADERS
|
||||
? Flag.OTEL_EXPORTER_OTLP_HEADERS.split(",").reduce(
|
||||
(acc, x) => {
|
||||
const [key, value] = x.split("=")
|
||||
acc[key] = value
|
||||
return acc
|
||||
},
|
||||
{} as Record<string, string>,
|
||||
)
|
||||
: undefined,
|
||||
}).pipe(Layer.provide(FetchHttpClient.layer))
|
||||
{} as Record<string, string>,
|
||||
)
|
||||
: undefined
|
||||
|
||||
export const layer = !base
|
||||
? EffectLogger.layer
|
||||
: Layer.mergeAll(
|
||||
EffectLogger.layer,
|
||||
Otlp.layerJson({
|
||||
baseUrl: base,
|
||||
loggerExportInterval: Duration.seconds(5),
|
||||
loggerMergeWithExisting: true,
|
||||
resource,
|
||||
headers,
|
||||
}),
|
||||
).pipe(Layer.provide(FetchHttpClient.layer))
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import { Config } from "@/config/config"
|
||||
import { Permission } from "@/permission"
|
||||
import { Plugin } from "@/plugin"
|
||||
import { Snapshot } from "@/snapshot"
|
||||
import { Log } from "@/util/log"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { Session } from "."
|
||||
import { LLM } from "./llm"
|
||||
import { MessageV2 } from "./message-v2"
|
||||
@@ -23,7 +23,7 @@ import { isRecord } from "@/util/record"
|
||||
|
||||
export namespace SessionProcessor {
|
||||
const DOOM_LOOP_THRESHOLD = 3
|
||||
const log = Log.create({ service: "session.processor" })
|
||||
const log = EffectLogger.create({ service: "session.processor" })
|
||||
|
||||
export type Result = "compact" | "stop" | "continue"
|
||||
|
||||
@@ -121,6 +121,7 @@ export namespace SessionProcessor {
|
||||
reasoningMap: {},
|
||||
}
|
||||
let aborted = false
|
||||
const slog = log.with({ sessionID: input.sessionID, messageID: input.assistantMessage.id })
|
||||
|
||||
const parse = (e: unknown) =>
|
||||
MessageV2.fromError(e, {
|
||||
@@ -448,7 +449,7 @@ export namespace SessionProcessor {
|
||||
return
|
||||
|
||||
default:
|
||||
log.info("unhandled", { ...value })
|
||||
yield* slog.info("unhandled", { event: value.type, value })
|
||||
return
|
||||
}
|
||||
})
|
||||
@@ -514,7 +515,7 @@ export namespace SessionProcessor {
|
||||
})
|
||||
|
||||
const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) {
|
||||
log.error("process", { error: e, stack: e instanceof Error ? e.stack : undefined })
|
||||
yield* slog.error("process", { error: errorMessage(e), stack: e instanceof Error ? e.stack : undefined })
|
||||
const error = parse(e)
|
||||
if (MessageV2.ContextOverflowError.isInstance(error)) {
|
||||
ctx.needsCompaction = true
|
||||
@@ -530,7 +531,7 @@ export namespace SessionProcessor {
|
||||
})
|
||||
|
||||
const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
|
||||
log.info("process")
|
||||
yield* slog.info("process")
|
||||
ctx.needsCompaction = false
|
||||
ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ import { Truncate } from "@/tool/truncate"
|
||||
import { decodeDataUrl } from "@/util/data-url"
|
||||
import { Process } from "@/util/process"
|
||||
import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { TaskTool, type TaskPromptOps } from "@/tool/task"
|
||||
@@ -64,6 +65,7 @@ const STRUCTURED_OUTPUT_SYSTEM_PROMPT = `IMPORTANT: The user has requested struc
|
||||
|
||||
export namespace SessionPrompt {
|
||||
const log = Log.create({ service: "session.prompt" })
|
||||
const elog = EffectLogger.create({ service: "session.prompt" })
|
||||
|
||||
export interface Interface {
|
||||
readonly cancel: (sessionID: SessionID) => Effect.Effect<void>
|
||||
@@ -102,7 +104,7 @@ export namespace SessionPrompt {
|
||||
const revert = yield* SessionRevert.Service
|
||||
|
||||
const cancel = Effect.fn("SessionPrompt.cancel")(function* (sessionID: SessionID) {
|
||||
log.info("cancel", { sessionID })
|
||||
yield* elog.info("cancel", { sessionID })
|
||||
yield* state.cancel(sessionID)
|
||||
})
|
||||
|
||||
@@ -196,11 +198,7 @@ export namespace SessionPrompt {
|
||||
const t = cleaned.length > 100 ? cleaned.substring(0, 97) + "..." : cleaned
|
||||
yield* sessions
|
||||
.setTitle({ sessionID: input.session.id, title: t })
|
||||
.pipe(
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.sync(() => log.error("failed to generate title", { error: Cause.squash(cause) })),
|
||||
),
|
||||
)
|
||||
.pipe(Effect.catchCause((cause) => elog.error("failed to generate title", { error: Cause.squash(cause) })))
|
||||
})
|
||||
|
||||
const insertReminders = Effect.fn("SessionPrompt.insertReminders")(function* (input: {
|
||||
@@ -1302,13 +1300,14 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
const runLoop: (sessionID: SessionID) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.run")(
|
||||
function* (sessionID: SessionID) {
|
||||
const ctx = yield* InstanceState.context
|
||||
const slog = elog.with({ sessionID })
|
||||
let structured: unknown | undefined
|
||||
let step = 0
|
||||
const session = yield* sessions.get(sessionID)
|
||||
|
||||
while (true) {
|
||||
yield* status.set(sessionID, { type: "busy" })
|
||||
log.info("loop", { step, sessionID })
|
||||
yield* slog.info("loop", { step })
|
||||
|
||||
let msgs = yield* MessageV2.filterCompactedEffect(sessionID)
|
||||
|
||||
@@ -1344,7 +1343,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
!hasToolCalls &&
|
||||
lastUser.id < lastAssistant.id
|
||||
) {
|
||||
log.info("exiting loop", { sessionID })
|
||||
yield* slog.info("exiting loop")
|
||||
break
|
||||
}
|
||||
|
||||
@@ -1540,7 +1539,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
)
|
||||
|
||||
const command = Effect.fn("SessionPrompt.command")(function* (input: CommandInput) {
|
||||
log.info("command", input)
|
||||
yield* elog.info("command", { sessionID: input.sessionID, command: input.command, agent: input.agent })
|
||||
const cmd = yield* commands.get(input.command)
|
||||
if (!cmd) {
|
||||
const available = (yield* commands.list()).map((c) => c.name)
|
||||
|
||||
Reference in New Issue
Block a user