diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts
index 2b36610e58..982e2d8040 100644
--- a/packages/opencode/test/session/compaction.test.ts
+++ b/packages/opencode/test/session/compaction.test.ts
@@ -1,8 +1,7 @@
import { afterEach, describe, expect, mock, test } from "bun:test"
import { APICallError } from "ai"
-import { Cause, Deferred, Effect, Exit, Layer, ManagedRuntime } from "effect"
+import { Cause, Deferred, Effect, Exit, Fiber, Layer } from "effect"
import * as Stream from "effect/Stream"
-import z from "zod"
import { Bus } from "../../src/bus"
import { Config } from "@/config/config"
import { Image } from "@/image/image"
@@ -10,11 +9,10 @@ import { Agent } from "../../src/agent/agent"
import { LLM } from "../../src/session/llm"
import { SessionCompaction } from "../../src/session/compaction"
import { Token } from "@/util/token"
-import { WithInstance } from "../../src/project/with-instance"
import * as Log from "@opencode-ai/core/util/log"
import { Permission } from "../../src/permission"
import { Plugin } from "../../src/plugin"
-import { provideTmpdirInstance, TestInstance, tmpdir } from "../fixture/fixture"
+import { provideTmpdirInstance, TestInstance } from "../fixture/fixture"
import { Session as SessionNs } from "@/session/session"
import { MessageV2 } from "../../src/session/message-v2"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
@@ -32,26 +30,6 @@ import { TestConfig } from "../fixture/config"
void Log.init({ print: false })
-function run(fx: Effect.Effect) {
- return Effect.runPromise(fx.pipe(Effect.provide(SessionNs.defaultLayer)))
-}
-
-const svc = {
- ...SessionNs,
- create(input?: SessionNs.CreateInput) {
- return run(SessionNs.Service.use((svc) => svc.create(input)))
- },
- messages(input: z.output) {
- return run(SessionNs.Service.use((svc) => svc.messages(input)))
- },
- updateMessage(msg: T) {
- return run(SessionNs.Service.use((svc) => svc.updateMessage(msg)))
- },
- updatePart(part: T) {
- return run(SessionNs.Service.use((svc) => svc.updatePart(part)))
- },
-}
-
const summary = Layer.succeed(
SessionSummary.Service,
SessionSummary.Service.of({
@@ -102,50 +80,6 @@ function createModel(opts: {
const wide = () => ProviderTest.fake({ model: createModel({ context: 100_000, output: 32_000 }) })
-async function user(sessionID: SessionID, text: string) {
- const msg = await svc.updateMessage({
- id: MessageID.ascending(),
- role: "user",
- sessionID,
- agent: "build",
- model: ref,
- time: { created: Date.now() },
- })
- await svc.updatePart({
- id: PartID.ascending(),
- messageID: msg.id,
- sessionID,
- type: "text",
- text,
- })
- return msg
-}
-
-async function assistant(sessionID: SessionID, parentID: MessageID, root: string) {
- const msg: MessageV2.Assistant = {
- id: MessageID.ascending(),
- role: "assistant",
- sessionID,
- mode: "build",
- agent: "build",
- path: { cwd: root, root },
- cost: 0,
- tokens: {
- output: 0,
- input: 0,
- reasoning: 0,
- cache: { read: 0, write: 0 },
- },
- modelID: ref.modelID,
- providerID: ref.providerID,
- parentID,
- time: { created: Date.now() },
- finish: "end_turn",
- }
- await svc.updateMessage(msg)
- return msg
-}
-
function createUserMessage(sessionID: SessionID, text: string) {
return Effect.gen(function* () {
const ssn = yield* SessionNs.Service
@@ -193,37 +127,40 @@ function createAssistantMessage(sessionID: SessionID, parentID: MessageID, root:
)
}
-async function summaryAssistant(sessionID: SessionID, parentID: MessageID, root: string, text: string) {
- const msg: MessageV2.Assistant = {
- id: MessageID.ascending(),
- role: "assistant",
- sessionID,
- mode: "compaction",
- agent: "compaction",
- path: { cwd: root, root },
- cost: 0,
- tokens: {
- output: 0,
- input: 0,
- reasoning: 0,
- cache: { read: 0, write: 0 },
- },
- modelID: ref.modelID,
- providerID: ref.providerID,
- parentID,
- summary: true,
- time: { created: Date.now() },
- finish: "end_turn",
- }
- await svc.updateMessage(msg)
- await svc.updatePart({
- id: PartID.ascending(),
- messageID: msg.id,
- sessionID,
- type: "text",
- text,
- })
- return msg
+function createSummaryAssistantMessage(sessionID: SessionID, parentID: MessageID, root: string, text: string) {
+ return SessionNs.Service.use((ssn) =>
+ Effect.gen(function* () {
+ const msg = yield* ssn.updateMessage({
+ id: MessageID.ascending(),
+ role: "assistant",
+ sessionID,
+ mode: "compaction",
+ agent: "compaction",
+ path: { cwd: root, root },
+ cost: 0,
+ tokens: {
+ output: 0,
+ input: 0,
+ reasoning: 0,
+ cache: { read: 0, write: 0 },
+ },
+ modelID: ref.modelID,
+ providerID: ref.providerID,
+ parentID,
+ summary: true,
+ time: { created: Date.now() },
+ finish: "end_turn",
+ })
+ yield* ssn.updatePart({
+ id: PartID.ascending(),
+ messageID: msg.id,
+ sessionID,
+ type: "text",
+ text,
+ })
+ return msg
+ }),
+ )
}
function createCompactionMarker(sessionID: SessionID) {
@@ -248,10 +185,6 @@ function createCompactionMarker(sessionID: SessionID) {
)
}
-async function createCompactionMarkerAsync(sessionID: SessionID) {
- return run(createCompactionMarker(sessionID))
-}
-
function fake(
input: Parameters[0],
result: "continue" | "compact",
@@ -283,26 +216,6 @@ function cfg(compaction?: Config.Info["compaction"]) {
})
}
-function runtime(
- result: "continue" | "compact",
- plugin = Plugin.defaultLayer,
- provider = ProviderTest.fake(),
- config = Config.defaultLayer,
-) {
- const bus = Bus.layer
- return ManagedRuntime.make(
- Layer.mergeAll(SessionCompaction.layer, bus).pipe(
- Layer.provide(provider.layer),
- Layer.provide(SessionNs.defaultLayer),
- Layer.provide(layer(result)),
- Layer.provide(Agent.defaultLayer),
- Layer.provide(plugin),
- Layer.provide(bus),
- Layer.provide(config),
- ),
- )
-}
-
const deps = Layer.mergeAll(
wide().layer,
layer("continue"),
@@ -365,10 +278,6 @@ function readCompactionPart(sessionID: SessionID) {
)
}
-async function lastCompactionPart(sessionID: SessionID) {
- return run(readCompactionPart(sessionID))
-}
-
function llm() {
const queue: Array<
Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream)
@@ -391,29 +300,6 @@ function llm() {
}
}
-function liveRuntime(layer: Layer.Layer, provider = ProviderTest.fake(), config = Config.defaultLayer) {
- const bus = Bus.layer
- const status = SessionStatus.layer.pipe(Layer.provide(bus))
- const processor = SessionProcessorModule.SessionProcessor.layer.pipe(
- Layer.provide(summary),
- Layer.provide(Image.defaultLayer),
- )
- return ManagedRuntime.make(
- Layer.mergeAll(SessionCompaction.layer.pipe(Layer.provide(processor)), processor, bus, status).pipe(
- Layer.provide(provider.layer),
- Layer.provide(SessionNs.defaultLayer),
- Layer.provide(Snapshot.defaultLayer),
- Layer.provide(layer),
- Layer.provide(Permission.defaultLayer),
- Layer.provide(Agent.defaultLayer),
- Layer.provide(Plugin.defaultLayer),
- Layer.provide(status),
- Layer.provide(bus),
- Layer.provide(config),
- ),
- )
-}
-
function reply(
text: string,
capture?: (input: LLM.StreamInput) => void,
@@ -469,23 +355,14 @@ function reply(
}
}
-function wait(ms = 50) {
- return new Promise((resolve) => setTimeout(resolve, ms))
-}
-
-function defer() {
- let resolve!: () => void
- const promise = new Promise((done) => {
- resolve = done
- })
- return { promise, resolve }
-}
-
-function plugin(ready: ReturnType) {
+function plugin(ready: Deferred.Deferred) {
return Layer.mock(Plugin.Service)({
trigger: (name: Name, _input: Input, output: Output) => {
if (name !== "experimental.session.compacting") return Effect.succeed(output)
- return Effect.sync(() => ready.resolve()).pipe(Effect.andThen(Effect.never), Effect.as(output))
+ return Effect.sync(() => Deferred.doneUnsafe(ready, Effect.void)).pipe(
+ Effect.andThen(Effect.never),
+ Effect.as(output),
+ )
},
list: () => Effect.succeed([]),
init: () => Effect.void,
@@ -1315,154 +1192,99 @@ describe("session.compaction.process", () => {
}),
)
- test("stops quickly when aborted during retry backoff", async () => {
- const stub = llm()
- const ready = defer()
- stub.push(
- Stream.fromAsyncIterable(
- {
- async *[Symbol.asyncIterator]() {
- yield { type: "start" } as LLM.Event
- throw new APICallError({
- message: "boom",
- url: "https://example.com/v1/chat/completions",
- requestBodyValues: {},
- statusCode: 503,
- responseHeaders: { "retry-after-ms": "10000" },
- responseBody: '{"error":"boom"}',
- isRetryable: true,
- })
+ itProcess.instance(
+ "stops quickly when aborted during retry backoff",
+ () => {
+ const stub = llm()
+ stub.push(
+ Stream.fromAsyncIterable(
+ {
+ async *[Symbol.asyncIterator]() {
+ yield { type: "start" } as LLM.Event
+ throw new APICallError({
+ message: "boom",
+ url: "https://example.com/v1/chat/completions",
+ requestBodyValues: {},
+ statusCode: 503,
+ responseHeaders: { "retry-after-ms": "10000" },
+ responseBody: '{"error":"boom"}',
+ isRetryable: true,
+ })
+ },
},
- },
- (err) => err,
- ),
- )
+ (err) => err,
+ ),
+ )
- await using tmp = await tmpdir({ git: true })
- await WithInstance.provide({
- directory: tmp.path,
- fn: async () => {
- const session = await svc.create({})
- const msg = await user(session.id, "hello")
- const msgs = await svc.messages({ sessionID: session.id })
- const abort = new AbortController()
- const rt = liveRuntime(stub.layer, wide())
- let off: (() => void) | undefined
- let run: Promise<"continue" | "stop"> | undefined
- try {
- off = await rt.runPromise(
- Bus.Service.use((svc) =>
- svc.subscribeCallback(SessionStatus.Event.Status, (evt) => {
- if (evt.properties.sessionID !== session.id) return
- if (evt.properties.status.type !== "retry") return
- ready.resolve()
- }),
- ),
- )
+ return Effect.gen(function* () {
+ const ssn = yield* SessionNs.Service
+ const bus = yield* Bus.Service
+ const ready = yield* Deferred.make()
+ const session = yield* ssn.create({})
+ const msg = yield* createUserMessage(session.id, "hello")
+ const msgs = yield* ssn.messages({ sessionID: session.id })
+ const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
+ if (evt.properties.sessionID !== session.id) return
+ if (evt.properties.status.type !== "retry") return
+ Deferred.doneUnsafe(ready, Effect.void)
+ })
+ yield* Effect.addFinalizer(() => Effect.sync(off))
- run = rt
- .runPromiseExit(
- SessionCompaction.Service.use((svc) =>
- svc.process({
- parentID: msg.id,
- messages: msgs,
- sessionID: session.id,
- auto: false,
- }),
- ),
- { signal: abort.signal },
- )
- .then((exit) => {
- if (Exit.isFailure(exit)) {
- if (Cause.hasInterrupts(exit.cause) && abort.signal.aborted) return "stop"
- throw Cause.squash(exit.cause)
- }
- return exit.value
- })
+ const fiber = yield* SessionCompaction.use
+ .process({
+ parentID: msg.id,
+ messages: msgs,
+ sessionID: session.id,
+ auto: false,
+ })
+ .pipe(Effect.forkChild)
- await Promise.race([
- ready.promise,
- wait(1000).then(() => {
- throw new Error("timed out waiting for retry status")
- }),
- ])
+ yield* Deferred.await(ready).pipe(Effect.timeout("1 second"))
+ const start = Date.now()
+ yield* Fiber.interrupt(fiber)
+ const exit = yield* Fiber.await(fiber).pipe(Effect.timeout("250 millis"))
- const start = Date.now()
- abort.abort()
- const result = await Promise.race([
- run.then((value) => ({ kind: "done" as const, value, ms: Date.now() - start })),
- wait(250).then(() => ({ kind: "timeout" as const })),
- ])
-
- expect(result.kind).toBe("done")
- if (result.kind === "done") {
- expect(result.value).toBe("stop")
- expect(result.ms).toBeLessThan(250)
- }
- } finally {
- off?.()
- abort.abort()
- await rt.dispose()
- await run?.catch(() => undefined)
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.hasInterrupts(exit.cause)).toBe(true)
+ expect(Date.now() - start).toBeLessThan(250)
}
- },
- })
- })
+ }).pipe(Effect.provide(compactionProcessLayer({ llm: stub.layer })))
+ },
+ { git: true },
+ )
- test("does not leave a summary assistant when aborted before processor setup", async () => {
- const ready = defer()
-
- await using tmp = await tmpdir({ git: true })
- await WithInstance.provide({
- directory: tmp.path,
- fn: async () => {
- const session = await svc.create({})
- const msg = await user(session.id, "hello")
- const msgs = await svc.messages({ sessionID: session.id })
- const abort = new AbortController()
- const rt = runtime("continue", plugin(ready), wide())
- let run: Promise<"continue" | "stop"> | undefined
- try {
- run = rt
- .runPromiseExit(
- SessionCompaction.Service.use((svc) =>
- svc.process({
- parentID: msg.id,
- messages: msgs,
- sessionID: session.id,
- auto: false,
- }),
- ),
- { signal: abort.signal },
- )
- .then((exit) => {
- if (Exit.isFailure(exit)) {
- if (Cause.hasInterrupts(exit.cause) && abort.signal.aborted) return "stop"
- throw Cause.squash(exit.cause)
- }
- return exit.value
+ itProcess.instance(
+ "does not leave a summary assistant when aborted before processor setup",
+ () =>
+ Effect.gen(function* () {
+ const ready = yield* Deferred.make()
+ return yield* Effect.gen(function* () {
+ const ssn = yield* SessionNs.Service
+ const session = yield* ssn.create({})
+ const msg = yield* createUserMessage(session.id, "hello")
+ const msgs = yield* ssn.messages({ sessionID: session.id })
+ const fiber = yield* SessionCompaction.use
+ .process({
+ parentID: msg.id,
+ messages: msgs,
+ sessionID: session.id,
+ auto: false,
})
+ .pipe(Effect.forkChild)
- await Promise.race([
- ready.promise,
- wait(1000).then(() => {
- throw new Error("timed out waiting for compaction hook")
- }),
- ])
+ yield* Deferred.await(ready).pipe(Effect.timeout("1 second"))
+ yield* Fiber.interrupt(fiber)
+ const exit = yield* Fiber.await(fiber).pipe(Effect.timeout("250 millis"))
+ const all = yield* ssn.messages({ sessionID: session.id })
- abort.abort()
- expect(await run).toBe("stop")
-
- const all = await svc.messages({ sessionID: session.id })
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) expect(Cause.hasInterrupts(exit.cause)).toBe(true)
expect(all.some((msg) => msg.info.role === "assistant" && msg.info.summary)).toBe(false)
- } finally {
- abort.abort()
- await rt.dispose()
- await run?.catch(() => undefined)
- }
- },
- })
- })
+ }).pipe(Effect.provide(compactionProcessLayer({ plugin: plugin(ready) })))
+ }),
+ { git: true },
+ )
itProcess.instance(
"does not allow tool calls while generating the summary",
@@ -1533,240 +1355,172 @@ describe("session.compaction.process", () => {
{ git: true },
)
- test("summarizes only the head while keeping recent tail out of summary input", async () => {
- const stub = llm()
- let captured = ""
- stub.push(
- reply("summary", (input) => {
- captured = JSON.stringify(input.messages)
- }),
- )
+ itProcess.instance(
+ "summarizes only the head while keeping recent tail out of summary input",
+ () => {
+ const stub = llm()
+ let captured = ""
+ stub.push(
+ reply("summary", (input) => {
+ captured = JSON.stringify(input.messages)
+ }),
+ )
+ return Effect.gen(function* () {
+ const ssn = yield* SessionNs.Service
+ const session = yield* ssn.create({})
+ yield* createUserMessage(session.id, "older context")
+ yield* createUserMessage(session.id, "keep this turn")
+ yield* createUserMessage(session.id, "and this one too")
+ yield* createCompactionMarker(session.id)
- await using tmp = await tmpdir({ git: true })
- await WithInstance.provide({
- directory: tmp.path,
- fn: async () => {
- const session = await svc.create({})
- await user(session.id, "older context")
- await user(session.id, "keep this turn")
- await user(session.id, "and this one too")
- await createCompactionMarkerAsync(session.id)
+ const msgs = yield* ssn.messages({ sessionID: session.id })
+ const parent = msgs.at(-1)?.info.id
+ expect(parent).toBeTruthy()
+ yield* SessionCompaction.use.process({
+ parentID: parent!,
+ messages: msgs,
+ sessionID: session.id,
+ auto: false,
+ })
- const rt = liveRuntime(stub.layer, wide())
- try {
- const msgs = await svc.messages({ sessionID: session.id })
- const parent = msgs.at(-1)?.info.id
- expect(parent).toBeTruthy()
- await rt.runPromise(
- SessionCompaction.Service.use((svc) =>
- svc.process({
- parentID: parent!,
- messages: msgs,
- sessionID: session.id,
- auto: false,
- }),
- ),
- )
+ expect(captured).toContain("older context")
+ expect(captured).not.toContain("keep this turn")
+ expect(captured).not.toContain("and this one too")
+ expect(captured).not.toContain("What did we do so far?")
+ }).pipe(Effect.provide(compactionProcessLayer({ llm: stub.layer })))
+ },
+ { git: true },
+ )
- expect(captured).toContain("older context")
- expect(captured).not.toContain("keep this turn")
- expect(captured).not.toContain("and this one too")
- expect(captured).not.toContain("What did we do so far?")
- } finally {
- await rt.dispose()
- }
- },
- })
- })
+ itProcess.instance(
+ "anchors repeated compactions with the previous summary",
+ () => {
+ const stub = llm()
+ let captured = ""
+ stub.push(reply("summary one"))
+ stub.push(
+ reply("summary two", (input) => {
+ captured = JSON.stringify(input.messages)
+ }),
+ )
- test("anchors repeated compactions with the previous summary", async () => {
- const stub = llm()
- let captured = ""
- stub.push(reply("summary one"))
- stub.push(
- reply("summary two", (input) => {
- captured = JSON.stringify(input.messages)
- }),
- )
+ return Effect.gen(function* () {
+ const ssn = yield* SessionNs.Service
+ const session = yield* ssn.create({})
+ yield* createUserMessage(session.id, "older context")
+ yield* createUserMessage(session.id, "keep this turn")
+ yield* createCompactionMarker(session.id)
- await using tmp = await tmpdir({ git: true })
- await WithInstance.provide({
- directory: tmp.path,
- fn: async () => {
- const session = await svc.create({})
- await user(session.id, "older context")
- await user(session.id, "keep this turn")
- await createCompactionMarkerAsync(session.id)
+ let msgs = yield* ssn.messages({ sessionID: session.id })
+ let parent = msgs.at(-1)?.info.id
+ expect(parent).toBeTruthy()
+ yield* SessionCompaction.use.process({ parentID: parent!, messages: msgs, sessionID: session.id, auto: false })
- const rt = liveRuntime(stub.layer, wide())
- try {
- let msgs = await svc.messages({ sessionID: session.id })
- let parent = msgs.at(-1)?.info.id
- expect(parent).toBeTruthy()
- await rt.runPromise(
- SessionCompaction.Service.use((svc) =>
- svc.process({
- parentID: parent!,
- messages: msgs,
- sessionID: session.id,
- auto: false,
- }),
- ),
- )
+ yield* createUserMessage(session.id, "latest turn")
+ yield* createCompactionMarker(session.id)
- await user(session.id, "latest turn")
- await createCompactionMarkerAsync(session.id)
+ msgs = MessageV2.filterCompacted(MessageV2.stream(session.id))
+ parent = msgs.at(-1)?.info.id
+ expect(parent).toBeTruthy()
+ yield* SessionCompaction.use.process({ parentID: parent!, messages: msgs, sessionID: session.id, auto: false })
- msgs = MessageV2.filterCompacted(MessageV2.stream(session.id))
- parent = msgs.at(-1)?.info.id
- expect(parent).toBeTruthy()
- await rt.runPromise(
- SessionCompaction.Service.use((svc) =>
- svc.process({
- parentID: parent!,
- messages: msgs,
- sessionID: session.id,
- auto: false,
- }),
- ),
- )
+ expect(captured).toContain("")
+ expect(captured).toContain("summary one")
+ expect(captured.match(/summary one/g)?.length).toBe(1)
+ expect(captured).toContain("## Constraints & Preferences")
+ expect(captured).toContain("## Progress")
+ }).pipe(Effect.provide(compactionProcessLayer({ llm: stub.layer })))
+ },
+ { git: true },
+ )
- expect(captured).toContain("")
- expect(captured).toContain("summary one")
- expect(captured.match(/summary one/g)?.length).toBe(1)
- expect(captured).toContain("## Constraints & Preferences")
- expect(captured).toContain("## Progress")
- } finally {
- await rt.dispose()
- }
- },
- })
- })
-
- test("keeps recent pre-compaction turns across repeated compactions", async () => {
+ itProcess.instance("keeps recent pre-compaction turns across repeated compactions", () => {
const stub = llm()
stub.push(reply("summary one"))
stub.push(reply("summary two"))
- await using tmp = await tmpdir()
- await WithInstance.provide({
- directory: tmp.path,
- fn: async () => {
- const session = await svc.create({})
- const u1 = await user(session.id, "one")
- const u2 = await user(session.id, "two")
- const u3 = await user(session.id, "three")
- await createCompactionMarkerAsync(session.id)
- const rt = liveRuntime(stub.layer, wide(), cfg({ tail_turns: 2, preserve_recent_tokens: 10_000 }))
- try {
- let msgs = await svc.messages({ sessionID: session.id })
- let parent = msgs.at(-1)?.info.id
- expect(parent).toBeTruthy()
- await rt.runPromise(
- SessionCompaction.Service.use((svc) =>
- svc.process({
- parentID: parent!,
- messages: msgs,
- sessionID: session.id,
- auto: false,
- }),
- ),
- )
+ return Effect.gen(function* () {
+ const ssn = yield* SessionNs.Service
+ const session = yield* ssn.create({})
+ const u1 = yield* createUserMessage(session.id, "one")
+ const u2 = yield* createUserMessage(session.id, "two")
+ const u3 = yield* createUserMessage(session.id, "three")
+ yield* createCompactionMarker(session.id)
- const u4 = await user(session.id, "four")
- await createCompactionMarkerAsync(session.id)
+ let msgs = yield* ssn.messages({ sessionID: session.id })
+ let parent = msgs.at(-1)?.info.id
+ expect(parent).toBeTruthy()
+ yield* SessionCompaction.use.process({ parentID: parent!, messages: msgs, sessionID: session.id, auto: false })
- msgs = MessageV2.filterCompacted(MessageV2.stream(session.id))
- parent = msgs.at(-1)?.info.id
- expect(parent).toBeTruthy()
- await rt.runPromise(
- SessionCompaction.Service.use((svc) =>
- svc.process({
- parentID: parent!,
- messages: msgs,
- sessionID: session.id,
- auto: false,
- }),
- ),
- )
+ const u4 = yield* createUserMessage(session.id, "four")
+ yield* createCompactionMarker(session.id)
- const filtered = MessageV2.filterCompacted(MessageV2.stream(session.id))
- const ids = filtered.map((msg) => msg.info.id)
+ msgs = MessageV2.filterCompacted(MessageV2.stream(session.id))
+ parent = msgs.at(-1)?.info.id
+ expect(parent).toBeTruthy()
+ yield* SessionCompaction.use.process({ parentID: parent!, messages: msgs, sessionID: session.id, auto: false })
- expect(ids).not.toContain(u1.id)
- expect(ids).not.toContain(u2.id)
- expect(ids).toContain(u3.id)
- expect(ids).toContain(u4.id)
- expect(filtered.some((msg) => msg.info.role === "assistant" && msg.info.summary)).toBe(true)
- expect(
- filtered.some((msg) => msg.info.role === "user" && msg.parts.some((part) => part.type === "compaction")),
- ).toBe(true)
- } finally {
- await rt.dispose()
- }
- },
- })
+ const filtered = MessageV2.filterCompacted(MessageV2.stream(session.id))
+ const ids = filtered.map((msg) => msg.info.id)
+
+ expect(ids).not.toContain(u1.id)
+ expect(ids).not.toContain(u2.id)
+ expect(ids).toContain(u3.id)
+ expect(ids).toContain(u4.id)
+ expect(filtered.some((msg) => msg.info.role === "assistant" && msg.info.summary)).toBe(true)
+ expect(
+ filtered.some((msg) => msg.info.role === "user" && msg.parts.some((part) => part.type === "compaction")),
+ ).toBe(true)
+ }).pipe(
+ Effect.provide(
+ compactionProcessLayer({ llm: stub.layer, config: cfg({ tail_turns: 2, preserve_recent_tokens: 10_000 }) }),
+ ),
+ )
})
- test("ignores previous summaries when sizing the retained tail", async () => {
- await using tmp = await tmpdir()
- await WithInstance.provide({
- directory: tmp.path,
- fn: async () => {
- const session = await svc.create({})
- await user(session.id, "older")
- const keep = await user(session.id, "keep this turn")
- const keepReply = await assistant(session.id, keep.id, tmp.path)
- await svc.updatePart({
- id: PartID.ascending(),
- messageID: keepReply.id,
- sessionID: session.id,
- type: "text",
- text: "keep reply",
- })
+ itProcess.instance(
+ "ignores previous summaries when sizing the retained tail",
+ Effect.gen(function* () {
+ const ssn = yield* SessionNs.Service
+ const test = yield* TestInstance
+ const session = yield* ssn.create({})
+ yield* createUserMessage(session.id, "older")
+ const keep = yield* createUserMessage(session.id, "keep this turn")
+ const keepReply = yield* createAssistantMessage(session.id, keep.id, test.directory)
+ yield* ssn.updatePart({
+ id: PartID.ascending(),
+ messageID: keepReply.id,
+ sessionID: session.id,
+ type: "text",
+ text: "keep reply",
+ })
- await createCompactionMarkerAsync(session.id)
- const firstCompaction = (await svc.messages({ sessionID: session.id })).at(-1)?.info.id
- expect(firstCompaction).toBeTruthy()
- await summaryAssistant(session.id, firstCompaction!, tmp.path, "summary ".repeat(800))
+ yield* createCompactionMarker(session.id)
+ const firstCompaction = (yield* ssn.messages({ sessionID: session.id })).at(-1)?.info.id
+ expect(firstCompaction).toBeTruthy()
+ yield* createSummaryAssistantMessage(session.id, firstCompaction!, test.directory, "summary ".repeat(800))
- const recent = await user(session.id, "recent turn")
- const recentReply = await assistant(session.id, recent.id, tmp.path)
- await svc.updatePart({
- id: PartID.ascending(),
- messageID: recentReply.id,
- sessionID: session.id,
- type: "text",
- text: "recent reply",
- })
+ const recent = yield* createUserMessage(session.id, "recent turn")
+ const recentReply = yield* createAssistantMessage(session.id, recent.id, test.directory)
+ yield* ssn.updatePart({
+ id: PartID.ascending(),
+ messageID: recentReply.id,
+ sessionID: session.id,
+ type: "text",
+ text: "recent reply",
+ })
- await createCompactionMarkerAsync(session.id)
+ yield* createCompactionMarker(session.id)
+ const msgs = yield* ssn.messages({ sessionID: session.id })
+ const parent = msgs.at(-1)?.info.id
+ expect(parent).toBeTruthy()
+ yield* SessionCompaction.use.process({ parentID: parent!, messages: msgs, sessionID: session.id, auto: false })
- const rt = runtime("continue", Plugin.defaultLayer, wide(), cfg({ tail_turns: 2, preserve_recent_tokens: 500 }))
- try {
- const msgs = await svc.messages({ sessionID: session.id })
- const parent = msgs.at(-1)?.info.id
- expect(parent).toBeTruthy()
- await rt.runPromise(
- SessionCompaction.Service.use((svc) =>
- svc.process({
- parentID: parent!,
- messages: msgs,
- sessionID: session.id,
- auto: false,
- }),
- ),
- )
-
- const part = await lastCompactionPart(session.id)
- expect(part?.type).toBe("compaction")
- expect(part?.tail_start_id).toBe(keep.id)
- } finally {
- await rt.dispose()
- }
- },
- })
- })
+ const part = yield* readCompactionPart(session.id)
+ expect(part?.type).toBe("compaction")
+ expect(part?.tail_start_id).toBe(keep.id)
+ }).pipe(Effect.provide(compactionProcessLayer({ config: cfg({ tail_turns: 2, preserve_recent_tokens: 500 }) }))),
+ )
})
describe("util.token.estimate", () => {