diff --git a/packages/opencode/test/session/messages-pagination.test.ts b/packages/opencode/test/session/messages-pagination.test.ts index 86e1d85d0d..49828a9b62 100644 --- a/packages/opencode/test/session/messages-pagination.test.ts +++ b/packages/opencode/test/session/messages-pagination.test.ts @@ -1,46 +1,41 @@ import { describe, expect, test } from "bun:test" import { Effect } from "effect" -import path from "path" -import { Instance } from "../../src/project/instance" -import { WithInstance } from "../../src/project/with-instance" import { Session as SessionNs } from "@/session/session" import { MessageV2 } from "../../src/session/message-v2" import { MessageID, PartID, type SessionID } from "../../src/session/schema" import { ModelID, ProviderID } from "../../src/provider/schema" import * as Log from "@opencode-ai/core/util/log" +import { testEffect } from "../lib/effect" -const root = path.join(__dirname, "../..") void Log.init({ print: false }) -function run(fx: Effect.Effect) { - return Effect.runPromise(fx.pipe(Effect.provide(SessionNs.defaultLayer))) -} +const it = testEffect(SessionNs.defaultLayer) -const svc = { - ...SessionNs, - create(input?: SessionNs.CreateInput) { - return run(SessionNs.Service.use((svc) => svc.create(input))) - }, - remove(id: SessionID) { - return run(SessionNs.Service.use((svc) => svc.remove(id))) - }, - updateMessage(msg: T) { - return run(SessionNs.Service.use((svc) => svc.updateMessage(msg))) - }, - updatePart(part: T) { - return run(SessionNs.Service.use((svc) => svc.updatePart(part))) - }, - fork(input: { sessionID: SessionID; messageID?: MessageID }) { - return run(SessionNs.Service.use((svc) => svc.fork(input))) - }, -} +const withSession = ( + fn: (input: { session: SessionNs.Interface; sessionID: SessionID }) => Effect.Effect, +) => + Effect.acquireUseRelease( + Effect.gen(function* () { + const session = yield* SessionNs.Service + const created = yield* session.create({}) + return { session, sessionID: created.id } + }), + fn, + (input) => input.session.remove(input.sessionID).pipe(Effect.ignore), + ) -async function fill(sessionID: SessionID, count: number, time = (i: number) => Date.now() + i) { +// Helper functions using Effect.gen +const fill = Effect.fn("Test.fill")(function* ( + sessionID: SessionID, + count: number, + time = (i: number) => Date.now() + i, +) { + const session = yield* SessionNs.Service const ids = [] as MessageID[] for (let i = 0; i < count; i++) { const id = MessageID.ascending() ids.push(id) - await svc.updateMessage({ + yield* session.updateMessage({ id, sessionID, role: "user", @@ -50,7 +45,7 @@ async function fill(sessionID: SessionID, count: number, time = (i: number) => D tools: {}, mode: "", } as unknown as MessageV2.Info) - await svc.updatePart({ + yield* session.updatePart({ id: PartID.ascending(), sessionID, messageID: id, @@ -59,11 +54,12 @@ async function fill(sessionID: SessionID, count: number, time = (i: number) => D }) } return ids -} +}) -async function addUser(sessionID: SessionID, text?: string) { +const addUser = Effect.fn("Test.addUser")(function* (sessionID: SessionID, text?: string) { + const session = yield* SessionNs.Service const id = MessageID.ascending() - await svc.updateMessage({ + yield* session.updateMessage({ id, sessionID, role: "user", @@ -74,7 +70,7 @@ async function addUser(sessionID: SessionID, text?: string) { mode: "", } as unknown as MessageV2.Info) if (text) { - await svc.updatePart({ + yield* session.updatePart({ id: PartID.ascending(), sessionID, messageID: id, @@ -83,15 +79,16 @@ async function addUser(sessionID: SessionID, text?: string) { }) } return id -} +}) -async function addAssistant( +const addAssistant = Effect.fn("Test.addAssistant")(function* ( sessionID: SessionID, parentID: MessageID, opts?: { summary?: boolean; finish?: string; error?: MessageV2.Assistant["error"] }, ) { + const session = yield* SessionNs.Service const id = MessageID.ascending() - await svc.updateMessage({ + yield* session.updateMessage({ id, sessionID, role: "assistant", @@ -109,10 +106,15 @@ async function addAssistant( error: opts?.error, } as unknown as MessageV2.Info) return id -} +}) -async function addCompactionPart(sessionID: SessionID, messageID: MessageID, tailStartID?: MessageID) { - await svc.updatePart({ +const addCompactionPart = Effect.fn("Test.addCompactionPart")(function* ( + sessionID: SessionID, + messageID: MessageID, + tailStartID?: MessageID, +) { + const session = yield* SessionNs.Service + yield* session.updatePart({ id: PartID.ascending(), sessionID, messageID, @@ -120,933 +122,713 @@ async function addCompactionPart(sessionID: SessionID, messageID: MessageID, tai auto: true, tail_start_id: tailStartID, } as any) -} +}) describe("MessageV2.page", () => { - test("returns sync result", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - await fill(session.id, 2) + it.instance("returns sync result", () => + withSession(({ sessionID }) => Effect.gen(function* () { + yield* fill(sessionID, 2) - const result = MessageV2.page({ sessionID: session.id, limit: 10 }) - expect(result).toBeDefined() - expect(result.items).toBeArray() + const result = MessageV2.page({ sessionID, limit: 10 }) + expect(result).toBeDefined() + expect(result.items).toBeArray() + })), + ) - await svc.remove(session.id) - }, - }) - }) + it.instance("pages backward with opaque cursors", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const ids = yield* fill(sessionID, 6) - test("pages backward with opaque cursors", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const ids = await fill(session.id, 6) + const a = MessageV2.page({ sessionID, limit: 2 }) + expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2)) + expect(a.items.every((item) => item.parts.length === 1)).toBe(true) + expect(a.more).toBe(true) + expect(a.cursor).toBeTruthy() - const a = MessageV2.page({ sessionID: session.id, limit: 2 }) - expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2)) - expect(a.items.every((item) => item.parts.length === 1)).toBe(true) - expect(a.more).toBe(true) - expect(a.cursor).toBeTruthy() + const b = MessageV2.page({ sessionID, limit: 2, before: a.cursor! }) + expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(-4, -2)) + expect(b.more).toBe(true) + expect(b.cursor).toBeTruthy() - const b = MessageV2.page({ sessionID: session.id, limit: 2, before: a.cursor! }) - expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(-4, -2)) - expect(b.more).toBe(true) - expect(b.cursor).toBeTruthy() + const c = MessageV2.page({ sessionID, limit: 2, before: b.cursor! }) + expect(c.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2)) + expect(c.more).toBe(false) + expect(c.cursor).toBeUndefined() + })), + ) - const c = MessageV2.page({ sessionID: session.id, limit: 2, before: b.cursor! }) - expect(c.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2)) - expect(c.more).toBe(false) - expect(c.cursor).toBeUndefined() + it.instance("returns items in chronological order within a page", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const ids = yield* fill(sessionID, 4) - await svc.remove(session.id) - }, - }) - }) + const result = MessageV2.page({ sessionID, limit: 4 }) + expect(result.items.map((item) => item.info.id)).toEqual(ids) + })), + ) - test("returns items in chronological order within a page", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const ids = await fill(session.id, 4) + it.instance("returns empty items for session with no messages", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const result = MessageV2.page({ sessionID, limit: 10 }) + expect(result.items).toEqual([]) + expect(result.more).toBe(false) + expect(result.cursor).toBeUndefined() + })), + ) - const result = MessageV2.page({ sessionID: session.id, limit: 4 }) - expect(result.items.map((item) => item.info.id)).toEqual(ids) + it.instance("throws NotFoundError for non-existent session", () => + Effect.gen(function* () { + const fake = "non-existent-session" as SessionID + expect(() => MessageV2.page({ sessionID: fake, limit: 10 })).toThrow("NotFoundError") + }), + ) - await svc.remove(session.id) - }, - }) - }) + it.instance("handles exact limit boundary", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const ids = yield* fill(sessionID, 3) - test("returns empty items for session with no messages", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) + const result = MessageV2.page({ sessionID, limit: 3 }) + expect(result.items.map((item) => item.info.id)).toEqual(ids) + expect(result.more).toBe(false) + expect(result.cursor).toBeUndefined() + })), + ) - const result = MessageV2.page({ sessionID: session.id, limit: 10 }) - expect(result.items).toEqual([]) - expect(result.more).toBe(false) - expect(result.cursor).toBeUndefined() + it.instance("limit of 1 returns single newest message", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const ids = yield* fill(sessionID, 5) - await svc.remove(session.id) - }, - }) - }) + const result = MessageV2.page({ sessionID, limit: 1 }) + expect(result.items).toHaveLength(1) + expect(result.items[0].info.id).toBe(ids[ids.length - 1]) + expect(result.more).toBe(true) + })), + ) - test("throws NotFoundError for non-existent session", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const fake = "non-existent-session" as SessionID - expect(() => MessageV2.page({ sessionID: fake, limit: 10 })).toThrow("NotFoundError") - }, - }) - }) + it.instance("hydrates multiple parts per message", () => + withSession(({ session, sessionID }) => Effect.gen(function* () { + const [id] = yield* fill(sessionID, 1) - test("handles exact limit boundary", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const ids = await fill(session.id, 3) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: id, + type: "text", + text: "extra", + }) - const result = MessageV2.page({ sessionID: session.id, limit: 3 }) - expect(result.items.map((item) => item.info.id)).toEqual(ids) - expect(result.more).toBe(false) - expect(result.cursor).toBeUndefined() + const result = MessageV2.page({ sessionID, limit: 10 }) + expect(result.items).toHaveLength(1) + expect(result.items[0].parts).toHaveLength(2) + })), + ) - await svc.remove(session.id) - }, - }) - }) + it.instance("accepts cursors from fractional timestamps", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const ids = yield* fill(sessionID, 4, (i: number) => 1000.5 + i) - test("limit of 1 returns single newest message", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const ids = await fill(session.id, 5) + const a = MessageV2.page({ sessionID, limit: 2 }) + const b = MessageV2.page({ sessionID, limit: 2, before: a.cursor! }) - const result = MessageV2.page({ sessionID: session.id, limit: 1 }) - expect(result.items).toHaveLength(1) - expect(result.items[0].info.id).toBe(ids[ids.length - 1]) - expect(result.more).toBe(true) + expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2)) + expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2)) + })), + ) - await svc.remove(session.id) - }, - }) - }) + it.instance("messages with same timestamp are ordered by id", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const ids = yield* fill(sessionID, 4, () => 1000) - test("hydrates multiple parts per message", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const [id] = await fill(session.id, 1) + const a = MessageV2.page({ sessionID, limit: 2 }) + expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2)) + expect(a.more).toBe(true) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: id, - type: "text", - text: "extra", - }) + const b = MessageV2.page({ sessionID, limit: 2, before: a.cursor! }) + expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2)) + expect(b.more).toBe(false) + })), + ) - const result = MessageV2.page({ sessionID: session.id, limit: 10 }) - expect(result.items).toHaveLength(1) - expect(result.items[0].parts).toHaveLength(2) + it.instance("does not return messages from other sessions", () => + Effect.gen(function* () { + const session = yield* SessionNs.Service + const a = yield* session.create({}) + const b = yield* session.create({}) + yield* fill(a.id, 3) + yield* fill(b.id, 2) - await svc.remove(session.id) - }, - }) - }) + const resultA = MessageV2.page({ sessionID: a.id, limit: 10 }) + const resultB = MessageV2.page({ sessionID: b.id, limit: 10 }) + expect(resultA.items).toHaveLength(3) + expect(resultB.items).toHaveLength(2) + expect(resultA.items.every((item) => item.info.sessionID === a.id)).toBe(true) + expect(resultB.items.every((item) => item.info.sessionID === b.id)).toBe(true) - test("accepts cursors from fractional timestamps", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const ids = await fill(session.id, 4, (i) => 1000.5 + i) + yield* session.remove(a.id) + yield* session.remove(b.id) + }), + ) - const a = MessageV2.page({ sessionID: session.id, limit: 2 }) - const b = MessageV2.page({ sessionID: session.id, limit: 2, before: a.cursor! }) + it.instance("large limit returns all messages without cursor", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const ids = yield* fill(sessionID, 10) - expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2)) - expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2)) - - await svc.remove(session.id) - }, - }) - }) - - test("messages with same timestamp are ordered by id", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const ids = await fill(session.id, 4, () => 1000) - - const a = MessageV2.page({ sessionID: session.id, limit: 2 }) - expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2)) - expect(a.more).toBe(true) - - const b = MessageV2.page({ sessionID: session.id, limit: 2, before: a.cursor! }) - expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2)) - expect(b.more).toBe(false) - - await svc.remove(session.id) - }, - }) - }) - - test("does not return messages from other sessions", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const a = await svc.create({}) - const b = await svc.create({}) - await fill(a.id, 3) - await fill(b.id, 2) - - const resultA = MessageV2.page({ sessionID: a.id, limit: 10 }) - const resultB = MessageV2.page({ sessionID: b.id, limit: 10 }) - expect(resultA.items).toHaveLength(3) - expect(resultB.items).toHaveLength(2) - expect(resultA.items.every((item) => item.info.sessionID === a.id)).toBe(true) - expect(resultB.items.every((item) => item.info.sessionID === b.id)).toBe(true) - - await svc.remove(a.id) - await svc.remove(b.id) - }, - }) - }) - - test("large limit returns all messages without cursor", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const ids = await fill(session.id, 10) - - const result = MessageV2.page({ sessionID: session.id, limit: 100 }) - expect(result.items).toHaveLength(10) - expect(result.items.map((item) => item.info.id)).toEqual(ids) - expect(result.more).toBe(false) - expect(result.cursor).toBeUndefined() - - await svc.remove(session.id) - }, - }) - }) + const result = MessageV2.page({ sessionID, limit: 100 }) + expect(result.items).toHaveLength(10) + expect(result.items.map((item) => item.info.id)).toEqual(ids) + expect(result.more).toBe(false) + expect(result.cursor).toBeUndefined() + })), + ) }) describe("MessageV2.stream", () => { - test("yields items newest first", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const ids = await fill(session.id, 5) + it.instance("yields items newest first", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const ids = yield* fill(sessionID, 5) - const items = Array.from(MessageV2.stream(session.id)) - expect(items.map((item) => item.info.id)).toEqual(ids.slice().reverse()) + const items = Array.from(MessageV2.stream(sessionID)) + expect(items.map((item) => item.info.id)).toEqual(ids.slice().reverse()) + })), + ) - await svc.remove(session.id) - }, - }) - }) + it.instance("yields nothing for empty session", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const items = Array.from(MessageV2.stream(sessionID)) + expect(items).toHaveLength(0) + })), + ) - test("yields nothing for empty session", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) + it.instance("yields single message", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const ids = yield* fill(sessionID, 1) - const items = Array.from(MessageV2.stream(session.id)) - expect(items).toHaveLength(0) + const items = Array.from(MessageV2.stream(sessionID)) + expect(items).toHaveLength(1) + expect(items[0].info.id).toBe(ids[0]) + })), + ) - await svc.remove(session.id) - }, - }) - }) + it.instance("hydrates parts for each yielded message", () => + withSession(({ sessionID }) => Effect.gen(function* () { + yield* fill(sessionID, 3) - test("yields single message", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const ids = await fill(session.id, 1) + const items = Array.from(MessageV2.stream(sessionID)) + for (const item of items) { + expect(item.parts).toHaveLength(1) + expect(item.parts[0].type).toBe("text") + } + })), + ) - const items = Array.from(MessageV2.stream(session.id)) - expect(items).toHaveLength(1) - expect(items[0].info.id).toBe(ids[0]) + it.instance("handles sets exceeding internal page size", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const ids = yield* fill(sessionID, 60) - await svc.remove(session.id) - }, - }) - }) + const items = Array.from(MessageV2.stream(sessionID)) + expect(items).toHaveLength(60) + expect(items[0].info.id).toBe(ids[ids.length - 1]) + expect(items[59].info.id).toBe(ids[0]) + })), + ) - test("hydrates parts for each yielded message", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - await fill(session.id, 3) + it.instance("is a sync generator", () => + withSession(({ sessionID }) => Effect.gen(function* () { + yield* fill(sessionID, 1) - const items = Array.from(MessageV2.stream(session.id)) - for (const item of items) { - expect(item.parts).toHaveLength(1) - expect(item.parts[0].type).toBe("text") - } - - await svc.remove(session.id) - }, - }) - }) - - test("handles sets exceeding internal page size", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const ids = await fill(session.id, 60) - - const items = Array.from(MessageV2.stream(session.id)) - expect(items).toHaveLength(60) - expect(items[0].info.id).toBe(ids[ids.length - 1]) - expect(items[59].info.id).toBe(ids[0]) - - await svc.remove(session.id) - }, - }) - }) - - test("is a sync generator", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - await fill(session.id, 1) - - const gen = MessageV2.stream(session.id) - const first = gen.next() - // sync generator returns { value, done } directly, not a Promise - expect(first).toHaveProperty("value") - expect(first).toHaveProperty("done") - expect(first.done).toBe(false) - - await svc.remove(session.id) - }, - }) - }) + const gen = MessageV2.stream(sessionID) + const first = gen.next() + // sync generator returns { value, done } directly, not a Promise + expect(first).toHaveProperty("value") + expect(first).toHaveProperty("done") + expect(first.done).toBe(false) + })), + ) }) describe("MessageV2.parts", () => { - test("returns parts for a message", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const [id] = await fill(session.id, 1) + it.instance("returns parts for a message", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const [id] = yield* fill(sessionID, 1) - const result = MessageV2.parts(id) - expect(result).toHaveLength(1) - expect(result[0].type).toBe("text") - expect((result[0] as MessageV2.TextPart).text).toBe("m0") + const result = MessageV2.parts(id) + expect(result).toHaveLength(1) + expect(result[0].type).toBe("text") + expect((result[0] as MessageV2.TextPart).text).toBe("m0") + })), + ) - await svc.remove(session.id) - }, - }) - }) + it.instance("returns empty array for message with no parts", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const id = yield* addUser(sessionID) - test("returns empty array for message with no parts", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const id = await addUser(session.id) + const result = MessageV2.parts(id) + expect(result).toEqual([]) + })), + ) - const result = MessageV2.parts(id) - expect(result).toEqual([]) + it.instance("returns multiple parts in order", () => + withSession(({ session, sessionID }) => Effect.gen(function* () { + const [id] = yield* fill(sessionID, 1) - await svc.remove(session.id) - }, - }) - }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: id, + type: "text", + text: "second", + }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: id, + type: "text", + text: "third", + }) - test("returns multiple parts in order", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const [id] = await fill(session.id, 1) + const result = MessageV2.parts(id) + expect(result).toHaveLength(3) + expect((result[0] as MessageV2.TextPart).text).toBe("m0") + expect((result[1] as MessageV2.TextPart).text).toBe("second") + expect((result[2] as MessageV2.TextPart).text).toBe("third") + })), + ) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: id, - type: "text", - text: "second", - }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: id, - type: "text", - text: "third", - }) + it.instance("returns empty for non-existent message id", () => + Effect.gen(function* () { + yield* SessionNs.Service + const result = MessageV2.parts(MessageID.ascending()) + expect(result).toEqual([]) + }), + ) - const result = MessageV2.parts(id) - expect(result).toHaveLength(3) - expect((result[0] as MessageV2.TextPart).text).toBe("m0") - expect((result[1] as MessageV2.TextPart).text).toBe("second") - expect((result[2] as MessageV2.TextPart).text).toBe("third") + it.instance("parts contain sessionID and messageID", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const [id] = yield* fill(sessionID, 1) - await svc.remove(session.id) - }, - }) - }) - - test("returns empty for non-existent message id", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - await svc.create({}) - const result = MessageV2.parts(MessageID.ascending()) - expect(result).toEqual([]) - }, - }) - }) - - test("parts contain sessionID and messageID", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const [id] = await fill(session.id, 1) - - const result = MessageV2.parts(id) - expect(result[0].sessionID).toBe(session.id) - expect(result[0].messageID).toBe(id) - - await svc.remove(session.id) - }, - }) - }) + const result = MessageV2.parts(id) + expect(result[0].sessionID).toBe(sessionID) + expect(result[0].messageID).toBe(id) + })), + ) }) describe("MessageV2.get", () => { - test("returns message with hydrated parts", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const [id] = await fill(session.id, 1) + it.instance("returns message with hydrated parts", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const [id] = yield* fill(sessionID, 1) - const result = MessageV2.get({ sessionID: session.id, messageID: id }) - expect(result.info.id).toBe(id) - expect(result.info.sessionID).toBe(session.id) - expect(result.info.role).toBe("user") - expect(result.parts).toHaveLength(1) - expect((result.parts[0] as MessageV2.TextPart).text).toBe("m0") + const result = MessageV2.get({ sessionID, messageID: id }) + expect(result.info.id).toBe(id) + expect(result.info.sessionID).toBe(sessionID) + expect(result.info.role).toBe("user") + expect(result.parts).toHaveLength(1) + expect((result.parts[0] as MessageV2.TextPart).text).toBe("m0") + })), + ) - await svc.remove(session.id) - }, - }) - }) + it.instance("throws NotFoundError for non-existent message", () => + withSession(({ sessionID }) => Effect.gen(function* () { + expect(() => MessageV2.get({ sessionID, messageID: MessageID.ascending() })).toThrow( + "NotFoundError", + ) + })), + ) - test("throws NotFoundError for non-existent message", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) + it.instance("scopes by session id", () => + Effect.gen(function* () { + const session = yield* SessionNs.Service + const a = yield* session.create({}) + const b = yield* session.create({}) + const [id] = yield* fill(a.id, 1) - expect(() => MessageV2.get({ sessionID: session.id, messageID: MessageID.ascending() })).toThrow( - "NotFoundError", - ) + expect(() => MessageV2.get({ sessionID: b.id, messageID: id })).toThrow("NotFoundError") + const result = MessageV2.get({ sessionID: a.id, messageID: id }) + expect(result.info.id).toBe(id) - await svc.remove(session.id) - }, - }) - }) + yield* session.remove(a.id) + yield* session.remove(b.id) + }), + ) - test("scopes by session id", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const a = await svc.create({}) - const b = await svc.create({}) - const [id] = await fill(a.id, 1) + it.instance("returns message with multiple parts", () => + withSession(({ session, sessionID }) => Effect.gen(function* () { + const [id] = yield* fill(sessionID, 1) - expect(() => MessageV2.get({ sessionID: b.id, messageID: id })).toThrow("NotFoundError") - const result = MessageV2.get({ sessionID: a.id, messageID: id }) - expect(result.info.id).toBe(id) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: id, + type: "text", + text: "extra", + }) - await svc.remove(a.id) - await svc.remove(b.id) - }, - }) - }) + const result = MessageV2.get({ sessionID, messageID: id }) + expect(result.parts).toHaveLength(2) + })), + ) - test("returns message with multiple parts", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const [id] = await fill(session.id, 1) + it.instance("returns assistant message with correct role", () => + withSession(({ session, sessionID }) => Effect.gen(function* () { + const uid = yield* addUser(sessionID, "hello") + const aid = yield* addAssistant(sessionID, uid) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: id, - type: "text", - text: "extra", - }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: aid, + type: "text", + text: "response", + }) - const result = MessageV2.get({ sessionID: session.id, messageID: id }) - expect(result.parts).toHaveLength(2) + const result = MessageV2.get({ sessionID, messageID: aid }) + expect(result.info.role).toBe("assistant") + expect(result.parts).toHaveLength(1) + expect((result.parts[0] as MessageV2.TextPart).text).toBe("response") + })), + ) - await svc.remove(session.id) - }, - }) - }) + it.instance("returns message with zero parts", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const id = yield* addUser(sessionID) - test("returns assistant message with correct role", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const uid = await addUser(session.id, "hello") - const aid = await addAssistant(session.id, uid) - - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: aid, - type: "text", - text: "response", - }) - - const result = MessageV2.get({ sessionID: session.id, messageID: aid }) - expect(result.info.role).toBe("assistant") - expect(result.parts).toHaveLength(1) - expect((result.parts[0] as MessageV2.TextPart).text).toBe("response") - - await svc.remove(session.id) - }, - }) - }) - - test("returns message with zero parts", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const id = await addUser(session.id) - - const result = MessageV2.get({ sessionID: session.id, messageID: id }) - expect(result.info.id).toBe(id) - expect(result.parts).toEqual([]) - - await svc.remove(session.id) - }, - }) - }) + const result = MessageV2.get({ sessionID, messageID: id }) + expect(result.info.id).toBe(id) + expect(result.parts).toEqual([]) + })), + ) }) describe("MessageV2.filterCompacted", () => { - test("returns all messages when no compaction", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const ids = await fill(session.id, 5) + it.instance("returns all messages when no compaction", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const ids = yield* fill(sessionID, 5) - const result = MessageV2.filterCompacted(MessageV2.stream(session.id)) - expect(result).toHaveLength(5) - // reversed from newest-first to chronological - expect(result.map((item) => item.info.id)).toEqual(ids) + const result = MessageV2.filterCompacted(MessageV2.stream(sessionID)) + expect(result).toHaveLength(5) + // reversed from newest-first to chronological + expect(result.map((item) => item.info.id)).toEqual(ids) + })), + ) - await svc.remove(session.id) - }, - }) - }) + it.instance("stops at compaction boundary and returns chronological order", () => + withSession(({ session, sessionID }) => Effect.gen(function* () { + // Chronological: u1(+compaction part), a1(summary, parentID=u1), u2, a2 + // Stream (newest first): a2, u2, a1(adds u1 to completed), u1(in completed + compaction) -> break + const u1 = yield* addUser(sessionID, "first question") + const a1 = yield* addAssistant(sessionID, u1, { summary: true, finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: a1, + type: "text", + text: "summary", + }) + yield* addCompactionPart(sessionID, u1) - test("stops at compaction boundary and returns chronological order", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) + const u2 = yield* addUser(sessionID, "new question") + const a2 = yield* addAssistant(sessionID, u2) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: a2, + type: "text", + text: "new response", + }) - // Chronological: u1(+compaction part), a1(summary, parentID=u1), u2, a2 - // Stream (newest first): a2, u2, a1(adds u1 to completed), u1(in completed + compaction) -> break - const u1 = await addUser(session.id, "first question") - const a1 = await addAssistant(session.id, u1, { summary: true, finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a1, - type: "text", - text: "summary", - }) - await addCompactionPart(session.id, u1) + const result = MessageV2.filterCompacted(MessageV2.stream(sessionID)) + // Includes compaction boundary: u1, a1, u2, a2 + expect(result[0].info.id).toBe(u1) + expect(result.length).toBe(4) + })), + ) - const u2 = await addUser(session.id, "new question") - const a2 = await addAssistant(session.id, u2) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a2, - type: "text", - text: "new response", - }) - - const result = MessageV2.filterCompacted(MessageV2.stream(session.id)) - // Includes compaction boundary: u1, a1, u2, a2 - expect(result[0].info.id).toBe(u1) - expect(result.length).toBe(4) - - await svc.remove(session.id) - }, - }) - }) - - test("handles empty iterable", () => { + it.live("handles empty iterable", () => Effect.sync(() => { const result = MessageV2.filterCompacted([]) expect(result).toEqual([]) - }) + })) - test("does not break on compaction part without matching summary", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) + it.instance("does not break on compaction part without matching summary", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const u1 = yield* addUser(sessionID, "hello") + yield* addCompactionPart(sessionID, u1) + yield* addUser(sessionID, "world") - const u1 = await addUser(session.id, "hello") - await addCompactionPart(session.id, u1) - await addUser(session.id, "world") + const result = MessageV2.filterCompacted(MessageV2.stream(sessionID)) + expect(result).toHaveLength(2) + })), + ) - const result = MessageV2.filterCompacted(MessageV2.stream(session.id)) - expect(result).toHaveLength(2) + it.instance("skips assistant with error even if marked as summary", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const u1 = yield* addUser(sessionID, "hello") + yield* addCompactionPart(sessionID, u1) - await svc.remove(session.id) - }, - }) - }) + const error = new MessageV2.APIError({ + message: "boom", + isRetryable: true, + }).toObject() as MessageV2.Assistant["error"] + yield* addAssistant(sessionID, u1, { summary: true, finish: "end_turn", error }) + yield* addUser(sessionID, "retry") - test("skips assistant with error even if marked as summary", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) + const result = MessageV2.filterCompacted(MessageV2.stream(sessionID)) + // Error assistant doesn't add to completed, so compaction boundary never triggers + expect(result).toHaveLength(3) + })), + ) - const u1 = await addUser(session.id, "hello") - await addCompactionPart(session.id, u1) + it.instance("skips assistant without finish even if marked as summary", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const u1 = yield* addUser(sessionID, "hello") + yield* addCompactionPart(sessionID, u1) - const error = new MessageV2.APIError({ - message: "boom", - isRetryable: true, - }).toObject() as MessageV2.Assistant["error"] - await addAssistant(session.id, u1, { summary: true, finish: "end_turn", error }) - await addUser(session.id, "retry") + // summary=true but no finish + yield* addAssistant(sessionID, u1, { summary: true }) + yield* addUser(sessionID, "next") - const result = MessageV2.filterCompacted(MessageV2.stream(session.id)) - // Error assistant doesn't add to completed, so compaction boundary never triggers - expect(result).toHaveLength(3) + const result = MessageV2.filterCompacted(MessageV2.stream(sessionID)) + expect(result).toHaveLength(3) + })), + ) - await svc.remove(session.id) - }, - }) - }) + it.instance("ignores original tail when compaction stores tail_start_id", () => + withSession(({ session, sessionID }) => Effect.gen(function* () { + const u1 = yield* addUser(sessionID, "first") + const a1 = yield* addAssistant(sessionID, u1, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: a1, + type: "text", + text: "first reply", + }) - test("skips assistant without finish even if marked as summary", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) + const u2 = yield* addUser(sessionID, "second") + const a2 = yield* addAssistant(sessionID, u2, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: a2, + type: "text", + text: "second reply", + }) - const u1 = await addUser(session.id, "hello") - await addCompactionPart(session.id, u1) + const c1 = yield* addUser(sessionID) + yield* addCompactionPart(sessionID, c1, u2) + const s1 = yield* addAssistant(sessionID, c1, { summary: true, finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: s1, + type: "text", + text: "summary", + }) - // summary=true but no finish - await addAssistant(session.id, u1, { summary: true }) - await addUser(session.id, "next") + const u3 = yield* addUser(sessionID, "third") + const a3 = yield* addAssistant(sessionID, u3, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: a3, + type: "text", + text: "third reply", + }) - const result = MessageV2.filterCompacted(MessageV2.stream(session.id)) - expect(result).toHaveLength(3) + const result = MessageV2.filterCompacted(MessageV2.stream(sessionID)) - await svc.remove(session.id) - }, - }) - }) + expect(result.map((item) => item.info.id)).toEqual([c1, s1, u3, a3]) + })), + ) - test("ignores original tail when compaction stores tail_start_id", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) + it.instance("fork keeps legacy tail_start_id without replaying the tail", () => + Effect.gen(function* () { + const session = yield* SessionNs.Service + const created = yield* session.create({}) - const u1 = await addUser(session.id, "first") - const a1 = await addAssistant(session.id, u1, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a1, - type: "text", - text: "first reply", - }) + const u1 = yield* addUser(created.id, "first") + const a1 = yield* addAssistant(created.id, u1, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID: created.id, + messageID: a1, + type: "text", + text: "first reply", + }) - const u2 = await addUser(session.id, "second") - const a2 = await addAssistant(session.id, u2, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a2, - type: "text", - text: "second reply", - }) + const u2 = yield* addUser(created.id, "second") + const a2 = yield* addAssistant(created.id, u2, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID: created.id, + messageID: a2, + type: "text", + text: "second reply", + }) - const c1 = await addUser(session.id) - await addCompactionPart(session.id, c1, u2) - const s1 = await addAssistant(session.id, c1, { summary: true, finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: s1, - type: "text", - text: "summary", - }) + const c1 = yield* addUser(created.id) + yield* addCompactionPart(created.id, c1, u2) + const s1 = yield* addAssistant(created.id, c1, { summary: true, finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID: created.id, + messageID: s1, + type: "text", + text: "summary", + }) - const u3 = await addUser(session.id, "third") - const a3 = await addAssistant(session.id, u3, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a3, - type: "text", - text: "third reply", - }) + const u3 = yield* addUser(created.id, "third") + const a3 = yield* addAssistant(created.id, u3, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID: created.id, + messageID: a3, + type: "text", + text: "third reply", + }) - const result = MessageV2.filterCompacted(MessageV2.stream(session.id)) + const parentFiltered = MessageV2.filterCompacted(MessageV2.stream(created.id)) + expect(parentFiltered.map((item) => item.info.id)).toEqual([c1, s1, u3, a3]) - expect(result.map((item) => item.info.id)).toEqual([c1, s1, u3, a3]) + const forked = yield* session.fork({ sessionID: created.id }) + const childFiltered = MessageV2.filterCompacted(MessageV2.stream(forked.id)) + expect(childFiltered).toHaveLength(parentFiltered.length) - await svc.remove(session.id) - }, - }) - }) + const tailPart = childFiltered.flatMap((m) => m.parts).find((p) => p.type === "compaction") + expect(tailPart?.type).toBe("compaction") + if (!tailPart || tailPart.type !== "compaction") throw new Error("Expected forked compaction part") + expect(tailPart.tail_start_id).toBeDefined() + expect(childFiltered.some((m) => m.info.id === tailPart.tail_start_id)).toBe(false) - test("fork keeps legacy tail_start_id without replaying the tail", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) + yield* session.remove(forked.id) + yield* session.remove(created.id) + }), + ) - const u1 = await addUser(session.id, "first") - const a1 = await addAssistant(session.id, u1, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a1, - type: "text", - text: "first reply", - }) + it.instance("does not replay an assistant tail when compaction starts inside a turn", () => + withSession(({ session, sessionID }) => Effect.gen(function* () { + const u1 = yield* addUser(sessionID, "first") + const a1 = yield* addAssistant(sessionID, u1, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: a1, + type: "text", + text: "first reply", + }) - const u2 = await addUser(session.id, "second") - const a2 = await addAssistant(session.id, u2, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a2, - type: "text", - text: "second reply", - }) + const u2 = yield* addUser(sessionID, "second") + const a2 = yield* addAssistant(sessionID, u2, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: a2, + type: "text", + text: "second reply", + }) + const a3 = yield* addAssistant(sessionID, u2, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: a3, + type: "text", + text: "tail reply", + }) - const c1 = await addUser(session.id) - await addCompactionPart(session.id, c1, u2) - const s1 = await addAssistant(session.id, c1, { summary: true, finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: s1, - type: "text", - text: "summary", - }) + const c1 = yield* addUser(sessionID) + yield* addCompactionPart(sessionID, c1, a3) + const s1 = yield* addAssistant(sessionID, c1, { summary: true, finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: s1, + type: "text", + text: "summary", + }) - const u3 = await addUser(session.id, "third") - const a3 = await addAssistant(session.id, u3, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a3, - type: "text", - text: "third reply", - }) + const u3 = yield* addUser(sessionID, "third") + const a4 = yield* addAssistant(sessionID, u3, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: a4, + type: "text", + text: "third reply", + }) - const parentFiltered = MessageV2.filterCompacted(MessageV2.stream(session.id)) - expect(parentFiltered.map((item) => item.info.id)).toEqual([c1, s1, u3, a3]) + const result = MessageV2.filterCompacted(MessageV2.stream(sessionID)) - const forked = await svc.fork({ sessionID: session.id }) - const childFiltered = MessageV2.filterCompacted(MessageV2.stream(forked.id)) - expect(childFiltered).toHaveLength(parentFiltered.length) + expect(result.map((item) => item.info.id)).toEqual([c1, s1, u3, a4]) + })), + ) - const tailPart = childFiltered.flatMap((m) => m.parts).find((p) => p.type === "compaction") - expect(tailPart?.type).toBe("compaction") - if (!tailPart || tailPart.type !== "compaction") throw new Error("Expected forked compaction part") - expect(tailPart.tail_start_id).toBeDefined() - expect(childFiltered.some((m) => m.info.id === tailPart.tail_start_id)).toBe(false) + it.instance("prefers latest compaction boundary when repeated compactions exist", () => + withSession(({ session, sessionID }) => Effect.gen(function* () { + const u1 = yield* addUser(sessionID, "first") + const a1 = yield* addAssistant(sessionID, u1, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: a1, + type: "text", + text: "first reply", + }) - await svc.remove(forked.id) - await svc.remove(session.id) - }, - }) - }) + const u2 = yield* addUser(sessionID, "second") + const a2 = yield* addAssistant(sessionID, u2, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: a2, + type: "text", + text: "second reply", + }) - test("does not replay an assistant tail when compaction starts inside a turn", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) + const c1 = yield* addUser(sessionID) + yield* addCompactionPart(sessionID, c1, u2) + const s1 = yield* addAssistant(sessionID, c1, { summary: true, finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: s1, + type: "text", + text: "summary one", + }) - const u1 = await addUser(session.id, "first") - const a1 = await addAssistant(session.id, u1, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a1, - type: "text", - text: "first reply", - }) + const u3 = yield* addUser(sessionID, "third") + const a3 = yield* addAssistant(sessionID, u3, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: a3, + type: "text", + text: "third reply", + }) - const u2 = await addUser(session.id, "second") - const a2 = await addAssistant(session.id, u2, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a2, - type: "text", - text: "second reply", - }) - const a3 = await addAssistant(session.id, u2, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a3, - type: "text", - text: "tail reply", - }) + const c2 = yield* addUser(sessionID) + yield* addCompactionPart(sessionID, c2, u3) + const s2 = yield* addAssistant(sessionID, c2, { summary: true, finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: s2, + type: "text", + text: "summary two", + }) - const c1 = await addUser(session.id) - await addCompactionPart(session.id, c1, a3) - const s1 = await addAssistant(session.id, c1, { summary: true, finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: s1, - type: "text", - text: "summary", - }) + const u4 = yield* addUser(sessionID, "fourth") + const a4 = yield* addAssistant(sessionID, u4, { finish: "end_turn" }) + yield* session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: a4, + type: "text", + text: "fourth reply", + }) - const u3 = await addUser(session.id, "third") - const a4 = await addAssistant(session.id, u3, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a4, - type: "text", - text: "third reply", - }) + const result = MessageV2.filterCompacted(MessageV2.stream(sessionID)) - const result = MessageV2.filterCompacted(MessageV2.stream(session.id)) - - expect(result.map((item) => item.info.id)).toEqual([c1, s1, u3, a4]) - - await svc.remove(session.id) - }, - }) - }) - - test("prefers latest compaction boundary when repeated compactions exist", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - - const u1 = await addUser(session.id, "first") - const a1 = await addAssistant(session.id, u1, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a1, - type: "text", - text: "first reply", - }) - - const u2 = await addUser(session.id, "second") - const a2 = await addAssistant(session.id, u2, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a2, - type: "text", - text: "second reply", - }) - - const c1 = await addUser(session.id) - await addCompactionPart(session.id, c1, u2) - const s1 = await addAssistant(session.id, c1, { summary: true, finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: s1, - type: "text", - text: "summary one", - }) - - const u3 = await addUser(session.id, "third") - const a3 = await addAssistant(session.id, u3, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a3, - type: "text", - text: "third reply", - }) - - const c2 = await addUser(session.id) - await addCompactionPart(session.id, c2, u3) - const s2 = await addAssistant(session.id, c2, { summary: true, finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: s2, - type: "text", - text: "summary two", - }) - - const u4 = await addUser(session.id, "fourth") - const a4 = await addAssistant(session.id, u4, { finish: "end_turn" }) - await svc.updatePart({ - id: PartID.ascending(), - sessionID: session.id, - messageID: a4, - type: "text", - text: "fourth reply", - }) - - const result = MessageV2.filterCompacted(MessageV2.stream(session.id)) - - expect(result.map((item) => item.info.id)).toEqual([c2, s2, u4, a4]) - - await svc.remove(session.id) - }, - }) - }) + expect(result.map((item) => item.info.id)).toEqual([c2, s2, u4, a4]) + })), + ) test("works with array input", () => { // filterCompacted accepts any Iterable, not just generators @@ -1093,82 +875,58 @@ describe("MessageV2.cursor", () => { }) describe("MessageV2 consistency", () => { - test("page hydration matches get for each message", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - await fill(session.id, 3) + it.instance("page hydration matches get for each message", () => + withSession(({ sessionID }) => Effect.gen(function* () { + yield* fill(sessionID, 3) - const paged = MessageV2.page({ sessionID: session.id, limit: 10 }) - for (const item of paged.items) { - const got = MessageV2.get({ sessionID: session.id, messageID: item.info.id as MessageID }) - expect(got.info).toEqual(item.info) - expect(got.parts).toEqual(item.parts) + const paged = MessageV2.page({ sessionID, limit: 10 }) + for (const item of paged.items) { + const got = MessageV2.get({ sessionID, messageID: item.info.id as MessageID }) + expect(got.info).toEqual(item.info) + expect(got.parts).toEqual(item.parts) + } + })), + ) + + it.instance("parts from get match standalone parts call", () => + withSession(({ sessionID }) => Effect.gen(function* () { + const [id] = yield* fill(sessionID, 1) + + const got = MessageV2.get({ sessionID, messageID: id }) + const standalone = MessageV2.parts(id) + expect(got.parts).toEqual(standalone) + })), + ) + + it.instance("stream collects same messages as exhaustive page iteration", () => + withSession(({ sessionID }) => Effect.gen(function* () { + yield* fill(sessionID, 7) + + const streamed = Array.from(MessageV2.stream(sessionID)) + + const paged = [] as MessageV2.WithParts[] + let cursor: string | undefined + while (true) { + const result = MessageV2.page({ sessionID, limit: 3, before: cursor }) + for (let i = result.items.length - 1; i >= 0; i--) { + paged.push(result.items[i]) } + if (!result.more || !result.cursor) break + cursor = result.cursor + } - await svc.remove(session.id) - }, - }) - }) + expect(streamed.map((m) => m.info.id)).toEqual(paged.map((m) => m.info.id)) + })), + ) - test("parts from get match standalone parts call", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - const [id] = await fill(session.id, 1) + it.instance("filterCompacted of full stream returns same as Array.from when no compaction", () => + withSession(({ sessionID }) => Effect.gen(function* () { + yield* fill(sessionID, 4) - const got = MessageV2.get({ sessionID: session.id, messageID: id }) - const standalone = MessageV2.parts(id) - expect(got.parts).toEqual(standalone) + const filtered = MessageV2.filterCompacted(MessageV2.stream(sessionID)) + const all = Array.from(MessageV2.stream(sessionID)).reverse() - await svc.remove(session.id) - }, - }) - }) - - test("stream collects same messages as exhaustive page iteration", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - await fill(session.id, 7) - - const streamed = Array.from(MessageV2.stream(session.id)) - - const paged = [] as MessageV2.WithParts[] - let cursor: string | undefined - while (true) { - const result = MessageV2.page({ sessionID: session.id, limit: 3, before: cursor }) - for (let i = result.items.length - 1; i >= 0; i--) { - paged.push(result.items[i]) - } - if (!result.more || !result.cursor) break - cursor = result.cursor - } - - expect(streamed.map((m) => m.info.id)).toEqual(paged.map((m) => m.info.id)) - - await svc.remove(session.id) - }, - }) - }) - - test("filterCompacted of full stream returns same as Array.from when no compaction", async () => { - await WithInstance.provide({ - directory: root, - fn: async () => { - const session = await svc.create({}) - await fill(session.id, 4) - - const filtered = MessageV2.filterCompacted(MessageV2.stream(session.id)) - const all = Array.from(MessageV2.stream(session.id)).reverse() - - expect(filtered.map((m) => m.info.id)).toEqual(all.map((m) => m.info.id)) - - await svc.remove(session.id) - }, - }) - }) + expect(filtered.map((m) => m.info.id)).toEqual(all.map((m) => m.info.id)) + })), + ) })