Compare commits

...

1 Commits

Author SHA1 Message Date
Kit Langton
53136cf57f tweak: improve motel observability for session flows 2026-04-10 22:11:38 -04:00
6 changed files with 152 additions and 10 deletions

View File

@@ -55,7 +55,7 @@ export namespace EffectLogger {
}
})
export const layer = Logger.layer([logger], { mergeWithExisting: false })
export const layer = Logger.layer([Logger.tracerLogger, logger], { mergeWithExisting: false })
export const create = (base: Fields = {}): Handle => ({
debug: (msg, extra) => call((item) => Effect.logDebug(item), base, msg, extra),

View File

@@ -6,7 +6,7 @@ import { Flag } from "@/flag/flag"
import { CHANNEL, VERSION } from "@/installation/meta"
export namespace Observability {
const base = Flag.OTEL_EXPORTER_OTLP_ENDPOINT
const base = Flag.OTEL_EXPORTER_OTLP_ENDPOINT ?? (CHANNEL === "local" ? "http://127.0.0.1:27686" : undefined)
export const enabled = !!base
const resource = {
@@ -34,6 +34,7 @@ export namespace Observability {
: Otlp.layerJson({
baseUrl: base,
loggerExportInterval: Duration.seconds(1),
tracerExportInterval: Duration.seconds(1),
loggerMergeWithExisting: true,
resource,
headers,

View File

@@ -123,6 +123,14 @@ export namespace SessionProcessor {
let aborted = false
const slog = log.with({ sessionID: input.sessionID, messageID: input.assistantMessage.id })
yield* Effect.annotateCurrentSpan({
sessionID: input.sessionID,
messageID: input.assistantMessage.id,
agent: input.assistantMessage.agent,
providerID: input.model.providerID,
modelID: input.model.id,
})
const parse = (e: unknown) =>
MessageV2.fromError(e, {
providerID: input.model.providerID,
@@ -531,7 +539,18 @@ export namespace SessionProcessor {
})
const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
yield* slog.info("process")
yield* Effect.annotateCurrentSpan({
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.id,
agent: ctx.assistantMessage.agent,
providerID: ctx.model.providerID,
modelID: ctx.model.id,
})
yield* slog.info("process", {
agent: ctx.assistantMessage.agent,
providerID: ctx.model.providerID,
modelID: ctx.model.id,
})
ctx.needsCompaction = false
ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
@@ -545,6 +564,17 @@ export namespace SessionProcessor {
Stream.tap((event) => handleEvent(event)),
Stream.takeUntil(() => ctx.needsCompaction),
Stream.runDrain,
Effect.withSpan(
"SessionProcessor.stream",
{
attributes: {
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.id,
agent: ctx.assistantMessage.agent,
},
},
{ captureStackTrace: false },
),
)
}).pipe(
Effect.onInterrupt(() =>
@@ -575,8 +605,19 @@ export namespace SessionProcessor {
Effect.ensuring(cleanup()),
)
if (ctx.needsCompaction) return "compact"
if (ctx.blocked || ctx.assistantMessage.error) return "stop"
if (ctx.needsCompaction) {
yield* slog.warn("compact", { finish: ctx.assistantMessage.finish, blocked: ctx.blocked })
return "compact"
}
if (ctx.blocked || ctx.assistantMessage.error) {
yield* slog.warn("stop", {
blocked: ctx.blocked,
finish: ctx.assistantMessage.finish,
hasError: !!ctx.assistantMessage.error,
})
return "stop"
}
yield* slog.info("continue", { finish: ctx.assistantMessage.finish })
return "continue"
})
})

View File

@@ -398,6 +398,17 @@ NOTE: At any point in time through this workflow you should feel free to ask the
return Effect.runPromise(
Effect.gen(function* () {
const ctx = context(args, options)
yield* Effect.annotateCurrentSpan({
tool: item.id,
sessionID: ctx.sessionID,
messageID: input.processor.message.id,
callID: ctx.callID,
})
yield* elog.info("tool.start", {
tool: item.id,
sessionID: ctx.sessionID,
callID: ctx.callID,
})
yield* plugin.trigger(
"tool.execute.before",
{ tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID },
@@ -421,8 +432,27 @@ NOTE: At any point in time through this workflow you should feel free to ask the
if (options.abortSignal?.aborted) {
yield* input.processor.completeToolCall(options.toolCallId, output)
}
yield* elog.info("tool.done", {
tool: item.id,
sessionID: ctx.sessionID,
callID: ctx.callID,
truncated: output.metadata.truncated,
})
return output
}),
}).pipe(
Effect.withSpan(
`Tool.${item.id}`,
{
attributes: {
tool: item.id,
sessionID: input.session.id,
messageID: input.processor.message.id,
callID: options.toolCallId,
},
},
{ captureStackTrace: false },
),
),
)
},
})
@@ -439,6 +469,13 @@ NOTE: At any point in time through this workflow you should feel free to ask the
Effect.runPromise(
Effect.gen(function* () {
const ctx = context(args, opts)
yield* Effect.annotateCurrentSpan({
tool: key,
sessionID: ctx.sessionID,
messageID: input.processor.message.id,
callID: ctx.callID,
})
yield* elog.info("tool.start", { tool: key, sessionID: ctx.sessionID, callID: ctx.callID })
yield* plugin.trigger(
"tool.execute.before",
{ tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId },
@@ -500,8 +537,27 @@ NOTE: At any point in time through this workflow you should feel free to ask the
if (opts.abortSignal?.aborted) {
yield* input.processor.completeToolCall(opts.toolCallId, output)
}
yield* elog.info("tool.done", {
tool: key,
sessionID: ctx.sessionID,
callID: ctx.callID,
truncated: output.metadata.truncated,
})
return output
}),
}).pipe(
Effect.withSpan(
`Tool.${key}`,
{
attributes: {
tool: key,
sessionID: input.session.id,
messageID: input.processor.message.id,
callID: opts.toolCallId,
},
},
{ captureStackTrace: false },
),
),
)
tools[key] = item
}
@@ -1327,6 +1383,14 @@ NOTE: At any point in time through this workflow you should feel free to ask the
if (!lastUser) throw new Error("No user message found in stream. This should never happen.")
yield* Effect.annotateCurrentSpan({
sessionID,
step,
agent: lastUser.agent,
providerID: lastUser.model.providerID,
modelID: lastUser.model.modelID,
})
const lastAssistantMsg = msgs.findLast(
(msg) => msg.info.role === "assistant" && msg.info.id === lastAssistant?.id,
)
@@ -1348,6 +1412,12 @@ NOTE: At any point in time through this workflow you should feel free to ask the
}
step++
yield* slog.info("step", {
step,
agent: lastUser.agent,
providerID: lastUser.model.providerID,
modelID: lastUser.model.modelID,
})
if (step === 1)
yield* title({
session,
@@ -1365,6 +1435,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
}
if (task?.type === "compaction") {
yield* slog.warn("compaction", { step, auto: task.auto, overflow: task.overflow })
const result = yield* compaction.process({
messages: msgs,
parentID: lastUser.id,
@@ -1469,7 +1540,21 @@ NOTE: At any point in time through this workflow you should feel free to ask the
Effect.promise(() => SystemPrompt.environment(model)),
instruction.system().pipe(Effect.orDie),
MessageV2.toModelMessagesEffect(msgs, model),
])
]).pipe(
Effect.withSpan(
"SessionPrompt.prepareInput",
{
attributes: {
sessionID,
step,
agent: agent.name,
providerID: model.providerID,
modelID: model.id,
},
},
{ captureStackTrace: false },
),
)
const system = [...env, ...(skills ? [skills] : []), ...instructions]
const format = lastUser.format ?? { type: "text" as const }
if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)

View File

@@ -24,6 +24,7 @@ export const WriteTool = Tool.defineEffect(
const lsp = yield* LSP.Service
const fs = yield* AppFileSystem.Service
const filetime = yield* FileTime.Service
const format = yield* Format.Service
return {
description: DESCRIPTION,
@@ -56,7 +57,7 @@ export const WriteTool = Tool.defineEffect(
)
yield* fs.writeWithDirs(filepath, params.content)
yield* Effect.promise(() => Format.file(filepath))
yield* format.file(filepath)
Bus.publish(File.Event.Edited, { file: filepath })
yield* Effect.promise(() =>
Bus.publish(FileWatcher.Event.Updated, {

View File

@@ -7,6 +7,7 @@ import { Instance } from "../../src/project/instance"
import { LSP } from "../../src/lsp"
import { AppFileSystem } from "../../src/filesystem"
import { FileTime } from "../../src/file/time"
import { Format } from "../../src/format"
import { Tool } from "../../src/tool/tool"
import { SessionID, MessageID } from "../../src/session/schema"
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
@@ -29,7 +30,20 @@ afterEach(async () => {
})
const it = testEffect(
Layer.mergeAll(LSP.defaultLayer, AppFileSystem.defaultLayer, FileTime.defaultLayer, CrossSpawnSpawner.defaultLayer),
Layer.mergeAll(
LSP.defaultLayer,
AppFileSystem.defaultLayer,
FileTime.defaultLayer,
CrossSpawnSpawner.defaultLayer,
Layer.succeed(
Format.Service,
Format.Service.of({
init: () => Effect.void,
status: () => Effect.succeed([]),
file: () => Effect.void,
}),
),
),
)
const init = Effect.fn("WriteToolTest.init")(function* () {