mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-14 16:42:38 +00:00
feat(compaction): serialize compaction tail (#26830)
This commit is contained in:
@@ -273,10 +273,10 @@ export const Info = Schema.Struct({
|
||||
}),
|
||||
tail_turns: Schema.optional(NonNegativeInt).annotate({
|
||||
description:
|
||||
"Number of recent user turns, including their following assistant/tool responses, to keep verbatim during compaction (default: 2)",
|
||||
"Number of recent user turns, including their following assistant/tool responses, to serialize into the compaction summary (default: 2)",
|
||||
}),
|
||||
preserve_recent_tokens: Schema.optional(NonNegativeInt).annotate({
|
||||
description: "Maximum number of tokens from recent turns to preserve verbatim after compaction",
|
||||
description: "Maximum number of tokens from recent turns to serialize into the compaction summary",
|
||||
}),
|
||||
reserved: Schema.optional(NonNegativeInt).annotate({
|
||||
description: "Token buffer for compaction. Leaves enough window to avoid overflow during compaction.",
|
||||
|
||||
@@ -79,12 +79,10 @@ Rules:
|
||||
type Turn = {
|
||||
start: number
|
||||
end: number
|
||||
id: MessageID
|
||||
}
|
||||
|
||||
type Tail = {
|
||||
start: number
|
||||
id: MessageID
|
||||
}
|
||||
|
||||
type CompletedCompaction = {
|
||||
@@ -121,19 +119,41 @@ function completedCompactions(messages: MessageV2.WithParts[]) {
|
||||
})
|
||||
}
|
||||
|
||||
function buildPrompt(input: { previousSummary?: string; context: string[] }) {
|
||||
function buildPrompt(input: { previousSummary?: string; context: string[]; tail?: string }) {
|
||||
const source = input.tail
|
||||
? "the conversation history above and the serialized recent conversation tail below"
|
||||
: "the conversation history above"
|
||||
const anchor = input.previousSummary
|
||||
? [
|
||||
"Update the anchored summary below using the conversation history above.",
|
||||
`Update the anchored summary below using ${source}.`,
|
||||
"Preserve still-true details, remove stale details, and merge in the new facts.",
|
||||
"<previous-summary>",
|
||||
input.previousSummary,
|
||||
"</previous-summary>",
|
||||
].join("\n")
|
||||
: "Create a new anchored summary from the conversation history above."
|
||||
return [anchor, SUMMARY_TEMPLATE, ...input.context].join("\n\n")
|
||||
: `Create a new anchored summary from ${source}.`
|
||||
const tail = input.tail
|
||||
? [
|
||||
"Fold this serialized recent conversation tail into the summary; it is not provider message history.",
|
||||
"<recent-conversation-tail>",
|
||||
input.tail,
|
||||
"</recent-conversation-tail>",
|
||||
].join("\n")
|
||||
: undefined
|
||||
return [anchor, ...(tail ? [tail] : []), SUMMARY_TEMPLATE, ...input.context].join("\n\n")
|
||||
}
|
||||
|
||||
const serialize = Effect.fn("SessionCompaction.serialize")(function* (input: {
|
||||
messages: MessageV2.WithParts[]
|
||||
model: Provider.Model
|
||||
}) {
|
||||
const messages = yield* MessageV2.toModelMessagesEffect(input.messages, input.model, {
|
||||
stripMedia: true,
|
||||
toolOutputMaxChars: TOOL_OUTPUT_MAX_CHARS,
|
||||
})
|
||||
return messages.length ? JSON.stringify(messages, null, 2) : undefined
|
||||
})
|
||||
|
||||
function preserveRecentBudget(input: { cfg: Config.Info; model: Provider.Model }) {
|
||||
return (
|
||||
input.cfg.compaction?.preserve_recent_tokens ??
|
||||
@@ -150,7 +170,6 @@ function turns(messages: MessageV2.WithParts[]) {
|
||||
result.push({
|
||||
start: i,
|
||||
end: messages.length,
|
||||
id: msg.info.id,
|
||||
})
|
||||
}
|
||||
for (let i = 0; i < result.length - 1; i++) {
|
||||
@@ -177,7 +196,6 @@ function splitTurn(input: {
|
||||
if (size > input.budget) continue
|
||||
return {
|
||||
start,
|
||||
id: input.messages[start]!.info.id,
|
||||
} satisfies Tail
|
||||
}
|
||||
return undefined
|
||||
@@ -244,8 +262,7 @@ export const layer: Layer.Layer<
|
||||
messages: MessageV2.WithParts[]
|
||||
model: Provider.Model
|
||||
}) {
|
||||
const msgs = yield* MessageV2.toModelMessagesEffect(input.messages, input.model)
|
||||
return Token.estimate(JSON.stringify(msgs))
|
||||
return Token.estimate((yield* serialize(input)) ?? "")
|
||||
})
|
||||
|
||||
const select = Effect.fn("SessionCompaction.select")(function* (input: {
|
||||
@@ -254,10 +271,10 @@ export const layer: Layer.Layer<
|
||||
model: Provider.Model
|
||||
}) {
|
||||
const limit = input.cfg.compaction?.tail_turns ?? DEFAULT_TAIL_TURNS
|
||||
if (limit <= 0) return { head: input.messages, tail_start_id: undefined }
|
||||
if (limit <= 0) return { head: input.messages, tail: [] }
|
||||
const budget = preserveRecentBudget({ cfg: input.cfg, model: input.model })
|
||||
const all = turns(input.messages)
|
||||
if (!all.length) return { head: input.messages, tail_start_id: undefined }
|
||||
if (!all.length) return { head: input.messages, tail: [] }
|
||||
const recent = all.slice(-limit)
|
||||
const sizes = yield* Effect.forEach(
|
||||
recent,
|
||||
@@ -276,7 +293,7 @@ export const layer: Layer.Layer<
|
||||
const size = sizes[i]
|
||||
if (total + size <= budget) {
|
||||
total += size
|
||||
keep = { start: turn.start, id: turn.id }
|
||||
keep = { start: turn.start }
|
||||
continue
|
||||
}
|
||||
const remaining = budget - total
|
||||
@@ -292,10 +309,10 @@ export const layer: Layer.Layer<
|
||||
break
|
||||
}
|
||||
|
||||
if (!keep || keep.start === 0) return { head: input.messages, tail_start_id: undefined }
|
||||
if (!keep) return { head: input.messages, tail: [] }
|
||||
return {
|
||||
head: input.messages.slice(0, keep.start),
|
||||
tail_start_id: keep.id,
|
||||
tail: input.messages.slice(keep.start),
|
||||
}
|
||||
})
|
||||
|
||||
@@ -406,7 +423,10 @@ export const layer: Layer.Layer<
|
||||
{ sessionID: input.sessionID },
|
||||
{ context: [], prompt: undefined },
|
||||
)
|
||||
const nextPrompt = compacting.prompt ?? buildPrompt({ previousSummary, context: compacting.context })
|
||||
const tailMessages = structuredClone(selected.tail)
|
||||
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: tailMessages })
|
||||
const tail = yield* serialize({ messages: tailMessages, model })
|
||||
const nextPrompt = compacting.prompt ?? buildPrompt({ previousSummary, context: compacting.context, tail })
|
||||
const msgs = structuredClone(selected.head)
|
||||
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
|
||||
const modelMessages = yield* MessageV2.toModelMessagesEffect(msgs, model, {
|
||||
@@ -473,13 +493,6 @@ export const layer: Layer.Layer<
|
||||
return "stop"
|
||||
}
|
||||
|
||||
if (compactionPart && selected.tail_start_id && compactionPart.tail_start_id !== selected.tail_start_id) {
|
||||
yield* session.updatePart({
|
||||
...compactionPart,
|
||||
tail_start_id: selected.tail_start_id,
|
||||
})
|
||||
}
|
||||
|
||||
if (result === "continue" && input.auto) {
|
||||
if (replay) {
|
||||
const original = replay.info
|
||||
@@ -575,7 +588,6 @@ export const layer: Layer.Layer<
|
||||
sessionID: input.sessionID,
|
||||
timestamp: DateTime.makeUnsafe(Date.now()),
|
||||
text: summary ?? "",
|
||||
include: selected.tail_start_id,
|
||||
})
|
||||
}
|
||||
yield* bus.publish(Event.Compacted, { sessionID: input.sessionID })
|
||||
|
||||
@@ -840,12 +840,13 @@ export const toModelMessagesEffect = Effect.fnUntraced(function* (
|
||||
return part.metadata?.anthropic?.signature != null
|
||||
})
|
||||
for (const part of msg.parts) {
|
||||
if (msg.info.summary && part.type !== "text") continue
|
||||
if (part.type === "text") {
|
||||
const text = part.text === "" && hasSignedReasoning ? " " : part.text
|
||||
assistantMessage.parts.push({
|
||||
type: "text",
|
||||
text,
|
||||
...(differentModel ? {} : { providerMetadata: part.metadata }),
|
||||
...(differentModel || msg.info.summary ? {} : { providerMetadata: part.metadata }),
|
||||
})
|
||||
}
|
||||
if (part.type === "step-start")
|
||||
@@ -1071,53 +1072,16 @@ export function get(input: { sessionID: SessionID; messageID: MessageID }): With
|
||||
export function filterCompacted(msgs: Iterable<WithParts>) {
|
||||
const result = [] as WithParts[]
|
||||
const completed = new Set<string>()
|
||||
let retain: MessageID | undefined
|
||||
for (const msg of msgs) {
|
||||
result.push(msg)
|
||||
if (retain) {
|
||||
if (msg.info.id === retain) break
|
||||
continue
|
||||
}
|
||||
if (msg.info.role === "user" && completed.has(msg.info.id)) {
|
||||
const part = msg.parts.find((item): item is CompactionPart => item.type === "compaction")
|
||||
if (!part) continue
|
||||
if (!part.tail_start_id) break
|
||||
retain = part.tail_start_id
|
||||
if (msg.info.id === retain) break
|
||||
if (msg.parts.some((item): item is CompactionPart => item.type === "compaction")) break
|
||||
continue
|
||||
}
|
||||
if (msg.info.role === "user" && completed.has(msg.info.id) && msg.parts.some((part) => part.type === "compaction"))
|
||||
break
|
||||
if (msg.info.role === "assistant" && msg.info.summary && msg.info.finish && !msg.info.error)
|
||||
completed.add(msg.info.parentID)
|
||||
}
|
||||
result.reverse()
|
||||
const compactionIndex = result.findLastIndex(
|
||||
(msg) =>
|
||||
msg.info.role === "user" &&
|
||||
msg.parts.some((item): item is CompactionPart => item.type === "compaction" && item.tail_start_id !== undefined),
|
||||
)
|
||||
const compaction = result[compactionIndex]
|
||||
const part = compaction?.parts.find(
|
||||
(item): item is CompactionPart => item.type === "compaction" && item.tail_start_id !== undefined,
|
||||
)
|
||||
const summaryIndex = compaction
|
||||
? result.findIndex(
|
||||
(msg, index) =>
|
||||
index > compactionIndex &&
|
||||
msg.info.role === "assistant" &&
|
||||
msg.info.summary &&
|
||||
msg.info.parentID === compaction.info.id,
|
||||
)
|
||||
: -1
|
||||
const tailIndex = part?.tail_start_id ? result.findIndex((msg) => msg.info.id === part.tail_start_id) : -1
|
||||
if (tailIndex >= 0 && tailIndex < compactionIndex && summaryIndex > compactionIndex) {
|
||||
return [
|
||||
...result.slice(compactionIndex, summaryIndex + 1),
|
||||
...result.slice(tailIndex, compactionIndex),
|
||||
...result.slice(summaryIndex + 1),
|
||||
]
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
|
||||
@@ -926,12 +926,12 @@ describe("session.compaction.process", () => {
|
||||
)
|
||||
|
||||
itCompaction.instance(
|
||||
"persists tail_start_id for retained recent turns",
|
||||
"does not persist tail_start_id for serialized recent turns",
|
||||
Effect.gen(function* () {
|
||||
const ssn = yield* SessionNs.Service
|
||||
const session = yield* ssn.create({})
|
||||
yield* createUserMessage(session.id, "first")
|
||||
const keep = yield* createUserMessage(session.id, "second")
|
||||
yield* createUserMessage(session.id, "second")
|
||||
yield* createUserMessage(session.id, "third")
|
||||
yield* createSummaryCompaction(session.id)
|
||||
|
||||
@@ -947,18 +947,18 @@ describe("session.compaction.process", () => {
|
||||
|
||||
const part = yield* readCompactionPart(session.id)
|
||||
expect(part?.type).toBe("compaction")
|
||||
expect(part?.tail_start_id).toBe(keep.id)
|
||||
expect(part?.tail_start_id).toBeUndefined()
|
||||
}).pipe(withCompaction({ config: cfg({ tail_turns: 2, preserve_recent_tokens: 10_000 }) })),
|
||||
)
|
||||
|
||||
itCompaction.instance(
|
||||
"shrinks retained tail to fit preserve token budget",
|
||||
"does not persist tail_start_id when shrinking serialized tail",
|
||||
Effect.gen(function* () {
|
||||
const ssn = yield* SessionNs.Service
|
||||
const session = yield* ssn.create({})
|
||||
yield* createUserMessage(session.id, "first")
|
||||
yield* createUserMessage(session.id, "x".repeat(2_000))
|
||||
const keep = yield* createUserMessage(session.id, "tiny")
|
||||
yield* createUserMessage(session.id, "tiny")
|
||||
yield* createSummaryCompaction(session.id)
|
||||
|
||||
const msgs = yield* ssn.messages({ sessionID: session.id })
|
||||
@@ -973,7 +973,7 @@ describe("session.compaction.process", () => {
|
||||
|
||||
const part = yield* readCompactionPart(session.id)
|
||||
expect(part?.type).toBe("compaction")
|
||||
expect(part?.tail_start_id).toBe(keep.id)
|
||||
expect(part?.tail_start_id).toBeUndefined()
|
||||
}).pipe(withCompaction({ config: cfg({ tail_turns: 2, preserve_recent_tokens: 100 }) })),
|
||||
)
|
||||
|
||||
@@ -1005,7 +1005,7 @@ describe("session.compaction.process", () => {
|
||||
)
|
||||
|
||||
itCompaction.instance(
|
||||
"falls back to full summary when retained tail media exceeds preserve token budget",
|
||||
"serializes retained tail media as text in the summary input",
|
||||
() => {
|
||||
const stub = llm()
|
||||
let captured = ""
|
||||
@@ -1078,15 +1078,16 @@ describe("session.compaction.process", () => {
|
||||
|
||||
const part = yield* readCompactionPart(session.id)
|
||||
expect(part?.type).toBe("compaction")
|
||||
expect(part?.tail_start_id).toBe(keep.id)
|
||||
expect(part?.tail_start_id).toBeUndefined()
|
||||
expect(captured).toContain("zzzz")
|
||||
expect(captured).not.toContain("keep tail")
|
||||
expect(captured).toContain("keep tail")
|
||||
|
||||
const filtered = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
expect(filtered.map((msg) => msg.info.id).slice(0, 3)).toEqual([parent!, expect.any(String), keep.id])
|
||||
expect(filtered.map((msg) => msg.info.id)).toEqual([parent!, expect.any(String)])
|
||||
expect(filtered[1]?.info.role).toBe("assistant")
|
||||
expect(filtered[1]?.info.role === "assistant" ? filtered[1].info.summary : false).toBe(true)
|
||||
expect(filtered.map((msg) => msg.info.id)).not.toContain(large.id)
|
||||
expect(filtered.map((msg) => msg.info.id)).not.toContain(keep.id)
|
||||
}).pipe(withCompaction({ llm: stub.layer, config: cfg({ tail_turns: 1, preserve_recent_tokens: 100 }) }))
|
||||
},
|
||||
{ git: true },
|
||||
@@ -1353,13 +1354,13 @@ describe("session.compaction.process", () => {
|
||||
)
|
||||
|
||||
itCompaction.instance(
|
||||
"summarizes only the head while keeping recent tail out of summary input",
|
||||
"summarizes the head while serializing recent tail into summary input",
|
||||
() => {
|
||||
const stub = llm()
|
||||
let captured = ""
|
||||
let captured: LLM.StreamInput["messages"] = []
|
||||
stub.push(
|
||||
reply("summary", (input) => {
|
||||
captured = JSON.stringify(input.messages)
|
||||
captured = input.messages
|
||||
}),
|
||||
)
|
||||
return Effect.gen(function* () {
|
||||
@@ -1380,10 +1381,15 @@ describe("session.compaction.process", () => {
|
||||
auto: false,
|
||||
})
|
||||
|
||||
expect(captured).toContain("older context")
|
||||
expect(captured).not.toContain("keep this turn")
|
||||
expect(captured).not.toContain("and this one too")
|
||||
expect(captured).not.toContain("What did we do so far?")
|
||||
const head = JSON.stringify(captured.slice(0, -1))
|
||||
const prompt = JSON.stringify(captured.at(-1))
|
||||
expect(head).toContain("older context")
|
||||
expect(head).not.toContain("keep this turn")
|
||||
expect(head).not.toContain("and this one too")
|
||||
expect(prompt).toContain("keep this turn")
|
||||
expect(prompt).toContain("and this one too")
|
||||
expect(prompt).toContain("recent-conversation-tail")
|
||||
expect(prompt).not.toContain("What did we do so far?")
|
||||
}).pipe(withCompaction({ llm: stub.layer }))
|
||||
},
|
||||
{ git: true },
|
||||
@@ -1431,7 +1437,7 @@ describe("session.compaction.process", () => {
|
||||
{ git: true },
|
||||
)
|
||||
|
||||
itCompaction.instance("keeps recent pre-compaction turns across repeated compactions", () => {
|
||||
itCompaction.instance("does not replay recent pre-compaction turns across repeated compactions", () => {
|
||||
const stub = llm()
|
||||
stub.push(reply("summary one"))
|
||||
stub.push(reply("summary two"))
|
||||
@@ -1462,8 +1468,8 @@ describe("session.compaction.process", () => {
|
||||
|
||||
expect(ids).not.toContain(u1.id)
|
||||
expect(ids).not.toContain(u2.id)
|
||||
expect(ids).toContain(u3.id)
|
||||
expect(ids).toContain(u4.id)
|
||||
expect(ids).not.toContain(u3.id)
|
||||
expect(ids).not.toContain(u4.id)
|
||||
expect(filtered.some((msg) => msg.info.role === "assistant" && msg.info.summary)).toBe(true)
|
||||
expect(
|
||||
filtered.some((msg) => msg.info.role === "user" && msg.parts.some((part) => part.type === "compaction")),
|
||||
@@ -1472,7 +1478,7 @@ describe("session.compaction.process", () => {
|
||||
})
|
||||
|
||||
itCompaction.instance(
|
||||
"ignores previous summaries when sizing the retained tail",
|
||||
"ignores previous summaries when sizing the serialized tail",
|
||||
Effect.gen(function* () {
|
||||
const ssn = yield* SessionNs.Service
|
||||
const test = yield* TestInstance
|
||||
@@ -1511,7 +1517,7 @@ describe("session.compaction.process", () => {
|
||||
|
||||
const part = yield* readCompactionPart(session.id)
|
||||
expect(part?.type).toBe("compaction")
|
||||
expect(part?.tail_start_id).toBe(keep.id)
|
||||
expect(part?.tail_start_id).toBeUndefined()
|
||||
}).pipe(withCompaction({ config: cfg({ tail_turns: 2, preserve_recent_tokens: 500 }) })),
|
||||
)
|
||||
})
|
||||
|
||||
@@ -785,7 +785,7 @@ describe("MessageV2.filterCompacted", () => {
|
||||
})
|
||||
})
|
||||
|
||||
test("retains original tail when compaction stores tail_start_id", async () => {
|
||||
test("ignores original tail when compaction stores tail_start_id", async () => {
|
||||
await WithInstance.provide({
|
||||
directory: root,
|
||||
fn: async () => {
|
||||
@@ -834,14 +834,14 @@ describe("MessageV2.filterCompacted", () => {
|
||||
|
||||
const result = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
|
||||
expect(result.map((item) => item.info.id)).toEqual([c1, s1, u2, a2, u3, a3])
|
||||
expect(result.map((item) => item.info.id)).toEqual([c1, s1, u3, a3])
|
||||
|
||||
await svc.remove(session.id)
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("fork remaps compaction tail_start_id for filterCompacted", async () => {
|
||||
test("fork keeps legacy tail_start_id without replaying the tail", async () => {
|
||||
await WithInstance.provide({
|
||||
directory: root,
|
||||
fn: async () => {
|
||||
@@ -889,7 +889,7 @@ describe("MessageV2.filterCompacted", () => {
|
||||
})
|
||||
|
||||
const parentFiltered = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
expect(parentFiltered.map((item) => item.info.id)).toEqual([c1, s1, u2, a2, u3, a3])
|
||||
expect(parentFiltered.map((item) => item.info.id)).toEqual([c1, s1, u3, a3])
|
||||
|
||||
const forked = await svc.fork({ sessionID: session.id })
|
||||
const childFiltered = MessageV2.filterCompacted(MessageV2.stream(forked.id))
|
||||
@@ -899,7 +899,7 @@ describe("MessageV2.filterCompacted", () => {
|
||||
expect(tailPart?.type).toBe("compaction")
|
||||
if (!tailPart || tailPart.type !== "compaction") throw new Error("Expected forked compaction part")
|
||||
expect(tailPart.tail_start_id).toBeDefined()
|
||||
expect(childFiltered.some((m) => m.info.id === tailPart.tail_start_id)).toBe(true)
|
||||
expect(childFiltered.some((m) => m.info.id === tailPart.tail_start_id)).toBe(false)
|
||||
|
||||
await svc.remove(forked.id)
|
||||
await svc.remove(session.id)
|
||||
@@ -907,7 +907,7 @@ describe("MessageV2.filterCompacted", () => {
|
||||
})
|
||||
})
|
||||
|
||||
test("retains an assistant tail when compaction starts inside a turn", async () => {
|
||||
test("does not replay an assistant tail when compaction starts inside a turn", async () => {
|
||||
await WithInstance.provide({
|
||||
directory: root,
|
||||
fn: async () => {
|
||||
@@ -964,7 +964,7 @@ describe("MessageV2.filterCompacted", () => {
|
||||
|
||||
const result = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
|
||||
expect(result.map((item) => item.info.id)).toEqual([c1, s1, a3, u3, a4])
|
||||
expect(result.map((item) => item.info.id)).toEqual([c1, s1, u3, a4])
|
||||
|
||||
await svc.remove(session.id)
|
||||
},
|
||||
@@ -1041,7 +1041,7 @@ describe("MessageV2.filterCompacted", () => {
|
||||
|
||||
const result = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
|
||||
expect(result.map((item) => item.info.id)).toEqual([c2, s2, u3, a3, u4, a4])
|
||||
expect(result.map((item) => item.info.id)).toEqual([c2, s2, u4, a4])
|
||||
|
||||
await svc.remove(session.id)
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user