diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 999a37b122..e48b1c7b08 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -218,7 +218,7 @@ When constructing the summary, try to stick to this template: const prompt = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n") const msgs = structuredClone(messages) yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs }) - const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true })) + const modelMessages = yield* MessageV2.toModelMessagesEffect(msgs, model, { stripMedia: true }) const ctx = yield* InstanceState.context const msg: MessageV2.Assistant = { id: MessageID.ascending(), diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index 5ed5acafaf..f697f19f12 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -593,15 +593,13 @@ export namespace Session { }) const messages = Effect.fn("Session.messages")(function* (input: { sessionID: SessionID; limit?: number }) { - return yield* Effect.promise(async () => { - const result = [] as MessageV2.WithParts[] - for await (const msg of MessageV2.stream(input.sessionID)) { - if (input.limit && result.length >= input.limit) break - result.push(msg) - } - result.reverse() - return result - }) + if (input.limit) { + const result = yield* MessageV2.pageEffect({ sessionID: input.sessionID, limit: input.limit }) + return result.items + } + const all = yield* MessageV2.streamEffect(input.sessionID) + all.reverse() + return all }) const removeMessage = Effect.fn("Session.removeMessage")(function* (input: { diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 7260a8af2e..1b0b27f9aa 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -15,6 +15,7 @@ import { errorMessage } from "@/util/error" import type { SystemError } from "bun" import type { Provider } from "@/provider/provider" import { ModelID, ProviderID } from "@/provider/schema" +import { Effect } from "effect" /** Error shape thrown by Bun's fetch() when gzip/br decompression fails mid-stream */ interface FetchDecompressionError extends Error { @@ -547,7 +548,7 @@ export namespace MessageV2 { and(eq(MessageTable.time_created, row.time), lt(MessageTable.id, row.id)), ) - async function hydrate(rows: (typeof MessageTable.$inferSelect)[]) { + function hydrate(rows: (typeof MessageTable.$inferSelect)[]) { const ids = rows.map((row) => row.id) const partByMessage = new Map() if (ids.length > 0) { @@ -809,48 +810,58 @@ export namespace MessageV2 { ) } + export const toModelMessagesEffect = Effect.fnUntraced(function* ( + input: WithParts[], + model: Provider.Model, + options?: { stripMedia?: boolean }, + ) { + return yield* Effect.promise(() => toModelMessages(input, model, options)) + }) + + function pageSync(input: { sessionID: SessionID; limit: number; before?: string }) { + const before = input.before ? cursor.decode(input.before) : undefined + const where = before + ? and(eq(MessageTable.session_id, input.sessionID), older(before)) + : eq(MessageTable.session_id, input.sessionID) + const rows = Database.use((db) => + db + .select() + .from(MessageTable) + .where(where) + .orderBy(desc(MessageTable.time_created), desc(MessageTable.id)) + .limit(input.limit + 1) + .all(), + ) + if (rows.length === 0) { + const row = Database.use((db) => + db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(), + ) + if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` }) + return { + items: [] as MessageV2.WithParts[], + more: false, + } + } + + const more = rows.length > input.limit + const slice = more ? rows.slice(0, input.limit) : rows + const items = hydrate(slice) + items.reverse() + const tail = slice.at(-1) + return { + items, + more, + cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined, + } + } + export const page = fn( z.object({ sessionID: SessionID.zod, limit: z.number().int().positive(), before: z.string().optional(), }), - async (input) => { - const before = input.before ? cursor.decode(input.before) : undefined - const where = before - ? and(eq(MessageTable.session_id, input.sessionID), older(before)) - : eq(MessageTable.session_id, input.sessionID) - const rows = Database.use((db) => - db - .select() - .from(MessageTable) - .where(where) - .orderBy(desc(MessageTable.time_created), desc(MessageTable.id)) - .limit(input.limit + 1) - .all(), - ) - if (rows.length === 0) { - const row = Database.use((db) => - db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(), - ) - if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` }) - return { - items: [] as MessageV2.WithParts[], - more: false, - } - } - - const more = rows.length > input.limit - const page = more ? rows.slice(0, input.limit) : rows - const items = await hydrate(page) - items.reverse() - const tail = page.at(-1) - return { - items, - more, - cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined, - } - }, + async (input) => pageSync(input), ) export const stream = fn(SessionID.zod, async function* (sessionID) { @@ -867,7 +878,7 @@ export namespace MessageV2 { } }) - export const parts = fn(MessageID.zod, async (message_id) => { + function partsSync(message_id: MessageID) { const rows = Database.use((db) => db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).orderBy(PartTable.id).all(), ) @@ -880,29 +891,83 @@ export namespace MessageV2 { messageID: row.message_id, }) as MessageV2.Part, ) - }) + } + + export const parts = fn(MessageID.zod, async (message_id) => partsSync(message_id)) + + function getSync(input: { sessionID: SessionID; messageID: MessageID }): WithParts { + const row = Database.use((db) => + db + .select() + .from(MessageTable) + .where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID))) + .get(), + ) + if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` }) + return { + info: info(row), + parts: partsSync(input.messageID), + } + } export const get = fn( z.object({ sessionID: SessionID.zod, messageID: MessageID.zod, }), - async (input): Promise => { - const row = Database.use((db) => - db - .select() - .from(MessageTable) - .where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID))) - .get(), - ) - if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` }) - return { - info: info(row), - parts: await parts(input.messageID), - } - }, + async (input): Promise => getSync(input), ) + export const partsEffect = Effect.fnUntraced(function* (id: MessageID) { + return partsSync(id) + }) + + export const getEffect = Effect.fnUntraced(function* (input: { sessionID: SessionID; messageID: MessageID }) { + return getSync(input) + }) + + export const pageEffect = Effect.fnUntraced(function* (input: { + sessionID: SessionID + limit: number + before?: string + }) { + return pageSync(input) + }) + + export const streamEffect = Effect.fnUntraced(function* (sessionID: SessionID) { + const result: WithParts[] = [] + const size = 50 + let before: string | undefined + while (true) { + const next = pageSync({ sessionID, limit: size, before }) + if (next.items.length === 0) break + for (let i = next.items.length - 1; i >= 0; i--) { + result.push(next.items[i]) + } + if (!next.more || !next.cursor) break + before = next.cursor + } + return result + }) + + function applyCompactionFilter(msgs: MessageV2.WithParts[]) { + const result = [] as MessageV2.WithParts[] + const completed = new Set() + for (const msg of msgs) { + result.push(msg) + if ( + msg.info.role === "user" && + completed.has(msg.info.id) && + msg.parts.some((part) => part.type === "compaction") + ) + break + if (msg.info.role === "assistant" && msg.info.summary && msg.info.finish && !msg.info.error) + completed.add(msg.info.parentID) + } + result.reverse() + return result + } + export async function filterCompacted(stream: AsyncIterable) { const result = [] as MessageV2.WithParts[] const completed = new Set() @@ -921,6 +986,10 @@ export namespace MessageV2 { return result } + export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: SessionID) { + return applyCompactionFilter(yield* streamEffect(sessionID)) + }) + export function fromError( e: unknown, ctx: { providerID: ProviderID; aborted?: boolean }, diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 2482e40fb3..f00fcaa30e 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -180,7 +180,7 @@ export namespace SessionProcessor { metadata: value.providerMetadata, } satisfies MessageV2.ToolPart) - const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id)) + const parts = yield* MessageV2.partsEffect(ctx.assistantMessage.id) const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD) if ( @@ -392,7 +392,7 @@ export namespace SessionProcessor { } ctx.reasoningMap = {} - const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id)) + const parts = yield* MessageV2.partsEffect(ctx.assistantMessage.id) for (const part of parts) { if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue yield* session.updatePart({ diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index caeea88fb0..5162475d9c 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -213,7 +213,7 @@ export namespace SessionPrompt { (yield* provider.getModel(input.providerID, input.modelID))) const msgs = onlySubtasks ? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }] - : yield* Effect.promise(() => MessageV2.toModelMessages(context, mdl)) + : yield* MessageV2.toModelMessagesEffect(context, mdl) const text = yield* Effect.promise(async (signal) => { const result = await LLM.stream({ agent: ag, @@ -1360,7 +1360,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the yield* status.set(sessionID, { type: "busy" }) log.info("loop", { step, sessionID }) - let msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(sessionID))) + let msgs = yield* MessageV2.filterCompactedEffect(sessionID) let lastUser: MessageV2.User | undefined let lastAssistant: MessageV2.Assistant | undefined diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index 1dd8b7edc9..9492c8d016 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -207,7 +207,7 @@ it.live("session.processor effect tests capture llm input cleanly", () => } satisfies LLM.StreamInput const value = yield* handle.process(input) - const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const parts = yield* MessageV2.partsEffect(msg.id) const calls = yield* llm.calls expect(value).toBe("continue") @@ -254,7 +254,7 @@ it.live("session.processor effect tests stop after token overflow requests compa tools: {}, }) - const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const parts = yield* MessageV2.partsEffect(msg.id) expect(value).toBe("compact") expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true) @@ -347,7 +347,7 @@ it.live("session.processor effect tests reset reasoning state across retries", ( tools: {}, }) - const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const parts = yield* MessageV2.partsEffect(msg.id) const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning") expect(value).toBe("continue") @@ -438,7 +438,7 @@ it.live("session.processor effect tests retry recognized structured json errors" tools: {}, }) - const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const parts = yield* MessageV2.partsEffect(msg.id) expect(value).toBe("continue") expect(yield* llm.calls).toBe(2) @@ -596,7 +596,7 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) { yield* handle.abort() } - const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const parts = yield* MessageV2.partsEffect(msg.id) const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool") expect(Exit.isFailure(exit)).toBe(true) @@ -669,7 +669,7 @@ it.live("session.processor effect tests record aborted errors and idle state", ( yield* handle.abort() } yield* Effect.promise(() => seen.promise) - const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id })) + const stored = yield* MessageV2.getEffect({ sessionID: chat.id, messageID: msg.id }) const state = yield* sts.get(chat.id) off() @@ -731,7 +731,7 @@ it.live("session.processor effect tests mark interruptions aborted without manua yield* Fiber.interrupt(run) const exit = yield* Fiber.await(run) - const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id })) + const stored = yield* MessageV2.getEffect({ sessionID: chat.id, messageID: msg.id }) const state = yield* sts.get(chat.id) expect(Exit.isFailure(exit)).toBe(true) diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts index 28b4cf15cb..42eacffd1c 100644 --- a/packages/opencode/test/session/prompt-effect.test.ts +++ b/packages/opencode/test/session/prompt-effect.test.ts @@ -469,7 +469,7 @@ it.live("failed subtask preserves metadata on error tool state", () => expect(result.info.role).toBe("assistant") expect(yield* llm.calls).toBe(2) - const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id))) + const msgs = yield* MessageV2.filterCompactedEffect(chat.id) const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general") expect(taskMsg?.info.role).toBe("assistant") if (!taskMsg || taskMsg.info.role !== "assistant") return @@ -628,7 +628,7 @@ it.live( const exit = yield* Fiber.await(fiber) expect(Exit.isSuccess(exit)).toBe(true) - const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id))) + const msgs = yield* MessageV2.filterCompactedEffect(chat.id) const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general") expect(taskMsg?.info.role).toBe("assistant") if (!taskMsg || taskMsg.info.role !== "assistant") return