Compare commits

...

1 Commits

Author SHA1 Message Date
Kit Langton
9b8dbeda78 refactor(session): effectify SessionSummary service
Convert SessionSummary to Effect service pattern with Interface,
Service, Layer, and async facades. Use Session.Service and Bus.Service
directly instead of facades.
2026-03-30 21:36:54 -04:00
3 changed files with 119 additions and 105 deletions

View File

@@ -436,13 +436,13 @@ export const SessionRoutes = lazy(() =>
validator(
"param",
z.object({
sessionID: SessionSummary.diff.schema.shape.sessionID,
sessionID: SessionSummary.DiffInput.shape.sessionID,
}),
),
validator(
"query",
z.object({
messageID: SessionSummary.diff.schema.shape.messageID,
messageID: SessionSummary.DiffInput.shape.messageID,
}),
),
async (c) => {

View File

@@ -294,12 +294,10 @@ export namespace SessionProcessor {
}
ctx.snapshot = undefined
}
yield* Effect.promise(() =>
SessionSummary.summarize({
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.parentID,
}),
).pipe(Effect.ignoreCause({ log: true, message: "session summary failed" }), Effect.forkDetach)
SessionSummary.summarize({
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.parentID,
})
if (
!ctx.assistantMessage.summary &&
isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model })

View File

@@ -1,14 +1,12 @@
import { fn } from "@/util/fn"
import z from "zod"
import { Effect, Layer, ServiceMap } from "effect"
import { makeRuntime } from "@/effect/run-service"
import { Bus } from "@/bus"
import { Snapshot } from "@/snapshot"
import { Storage } from "@/storage/storage"
import { Session } from "."
import { MessageV2 } from "./message-v2"
import { SessionID, MessageID } from "./schema"
import { Snapshot } from "@/snapshot"
import { Storage } from "@/storage/storage"
import { Bus } from "@/bus"
import { NotFoundError } from "@/storage/db"
export namespace SessionSummary {
function unquoteGitPath(input: string) {
@@ -67,103 +65,121 @@ export namespace SessionSummary {
return Buffer.from(bytes).toString()
}
export const summarize = fn(
z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
}),
async (input) => {
await Session.messages({ sessionID: input.sessionID })
.then((all) =>
Promise.all([
summarizeSession({ sessionID: input.sessionID, messages: all }),
summarizeMessage({ messageID: input.messageID, messages: all }),
]),
)
.catch((err) => {
if (NotFoundError.isInstance(err)) return
throw err
})
},
)
async function summarizeSession(input: { sessionID: SessionID; messages: MessageV2.WithParts[] }) {
const diffs = await computeDiff({ messages: input.messages })
await Session.setSummary({
sessionID: input.sessionID,
summary: {
additions: diffs.reduce((sum, x) => sum + x.additions, 0),
deletions: diffs.reduce((sum, x) => sum + x.deletions, 0),
files: diffs.length,
},
})
await Storage.write(["session_diff", input.sessionID], diffs)
Bus.publish(Session.Event.Diff, {
sessionID: input.sessionID,
diff: diffs,
})
export interface Interface {
readonly summarize: (input: { sessionID: SessionID; messageID: MessageID }) => Effect.Effect<void>
readonly diff: (input: { sessionID: SessionID; messageID?: MessageID }) => Effect.Effect<Snapshot.FileDiff[]>
readonly computeDiff: (input: { messages: MessageV2.WithParts[] }) => Effect.Effect<Snapshot.FileDiff[]>
}
async function summarizeMessage(input: { messageID: string; messages: MessageV2.WithParts[] }) {
const messages = input.messages.filter(
(m) => m.info.id === input.messageID || (m.info.role === "assistant" && m.info.parentID === input.messageID),
)
const msgWithParts = messages.find((m) => m.info.id === input.messageID)
if (!msgWithParts || msgWithParts.info.role !== "user") return
const userMsg = msgWithParts.info
const diffs = await computeDiff({ messages })
userMsg.summary = {
...userMsg.summary,
diffs,
}
await Session.updateMessage(userMsg)
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SessionSummary") {}
export const diff = fn(
z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod.optional(),
}),
async (input) => {
const diffs = await Storage.read<Snapshot.FileDiff[]>(["session_diff", input.sessionID]).catch(() => [])
const next = diffs.map((item) => {
const file = unquoteGitPath(item.file)
if (file === item.file) return item
return {
...item,
file,
}
})
const changed = next.some((item, i) => item.file !== diffs[i]?.file)
if (changed) Storage.write(["session_diff", input.sessionID], next).catch(() => {})
return next
},
)
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const sessions = yield* Session.Service
const snapshot = yield* Snapshot.Service
const storage = yield* Storage.Service
const bus = yield* Bus.Service
export async function computeDiff(input: { messages: MessageV2.WithParts[] }) {
let from: string | undefined
let to: string | undefined
// scan assistant messages to find earliest from and latest to
// snapshot
for (const item of input.messages) {
if (!from) {
for (const part of item.parts) {
if (part.type === "step-start" && part.snapshot) {
from = part.snapshot
break
const computeDiff = Effect.fn("SessionSummary.computeDiff")(function* (input: {
messages: MessageV2.WithParts[]
}) {
let from: string | undefined
let to: string | undefined
for (const item of input.messages) {
if (!from) {
for (const part of item.parts) {
if (part.type === "step-start" && part.snapshot) {
from = part.snapshot
break
}
}
}
for (const part of item.parts) {
if (part.type === "step-finish" && part.snapshot) to = part.snapshot
}
}
}
if (from && to) return yield* snapshot.diffFull(from, to)
return []
})
for (const part of item.parts) {
if (part.type === "step-finish" && part.snapshot) {
to = part.snapshot
}
}
}
const summarize = Effect.fn("SessionSummary.summarize")(function* (input: {
sessionID: SessionID
messageID: MessageID
}) {
const all = yield* sessions.messages({ sessionID: input.sessionID })
if (!all.length) return
if (from && to) return Snapshot.diffFull(from, to)
return []
const diffs = yield* computeDiff({ messages: all })
yield* sessions.setSummary({
sessionID: input.sessionID,
summary: {
additions: diffs.reduce((sum, x) => sum + x.additions, 0),
deletions: diffs.reduce((sum, x) => sum + x.deletions, 0),
files: diffs.length,
},
})
yield* storage.write(["session_diff", input.sessionID], diffs).pipe(Effect.ignore)
yield* bus.publish(Session.Event.Diff, { sessionID: input.sessionID, diff: diffs })
const messages = all.filter(
(m) =>
m.info.id === input.messageID || (m.info.role === "assistant" && m.info.parentID === input.messageID),
)
const target = messages.find((m) => m.info.id === input.messageID)
if (!target || target.info.role !== "user") return
const msgDiffs = yield* computeDiff({ messages })
target.info.summary = { ...target.info.summary, diffs: msgDiffs }
yield* sessions.updateMessage(target.info)
})
const diff = Effect.fn("SessionSummary.diff")(function* (input: {
sessionID: SessionID
messageID?: MessageID
}) {
const diffs = yield* storage.read<Snapshot.FileDiff[]>(["session_diff", input.sessionID]).pipe(
Effect.catch(() => Effect.succeed([] as Snapshot.FileDiff[])),
)
const next = diffs.map((item) => {
const file = unquoteGitPath(item.file)
if (file === item.file) return item
return { ...item, file }
})
const changed = next.some((item, i) => item.file !== diffs[i]?.file)
if (changed) yield* storage.write(["session_diff", input.sessionID], next).pipe(Effect.ignore)
return next
})
return Service.of({ summarize, diff, computeDiff })
}),
)
export const defaultLayer = Layer.unwrap(
Effect.sync(() =>
layer.pipe(
Layer.provide(Session.defaultLayer),
Layer.provide(Snapshot.defaultLayer),
Layer.provide(Storage.defaultLayer),
Layer.provide(Bus.layer),
),
),
)
const { runPromise } = makeRuntime(Service, defaultLayer)
export const summarize = (input: { sessionID: SessionID; messageID: MessageID }) =>
void runPromise((svc) => svc.summarize(input)).catch(() => {})
export const DiffInput = z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod.optional(),
})
export async function diff(input: z.infer<typeof DiffInput>) {
return runPromise((svc) => svc.diff(input))
}
export async function computeDiff(input: { messages: MessageV2.WithParts[] }) {
return runPromise((svc) => svc.computeDiff(input))
}
}