Compare commits

...

1 Commits

Author SHA1 Message Date
Kit Langton
af3f77f50d refactor: add Effect-returning versions of MessageV2 functions
Extract sync cores (partsSync, getSync, pageSync) from MessageV2 DB
functions and add Effect.fn wrappers: partsEffect, getEffect, pageEffect,
streamEffect, filterCompactedEffect, toModelMessagesEffect.

Replace all Effect.promise(() => MessageV2.*) call sites in processor,
prompt, compaction, session index, and tests with direct yield* calls.
2026-03-31 21:32:22 -04:00
7 changed files with 143 additions and 76 deletions

View File

@@ -218,7 +218,7 @@ When constructing the summary, try to stick to this template:
const prompt = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n")
const msgs = structuredClone(messages)
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
const modelMessages = yield* MessageV2.toModelMessagesEffect(msgs, model, { stripMedia: true })
const ctx = yield* InstanceState.context
const msg: MessageV2.Assistant = {
id: MessageID.ascending(),

View File

@@ -593,15 +593,13 @@ export namespace Session {
})
const messages = Effect.fn("Session.messages")(function* (input: { sessionID: SessionID; limit?: number }) {
return yield* Effect.promise(async () => {
const result = [] as MessageV2.WithParts[]
for await (const msg of MessageV2.stream(input.sessionID)) {
if (input.limit && result.length >= input.limit) break
result.push(msg)
}
result.reverse()
return result
})
if (input.limit) {
const result = yield* MessageV2.pageEffect({ sessionID: input.sessionID, limit: input.limit })
return result.items
}
const all = yield* MessageV2.streamEffect(input.sessionID)
all.reverse()
return all
})
const removeMessage = Effect.fn("Session.removeMessage")(function* (input: {

View File

@@ -15,6 +15,7 @@ import { errorMessage } from "@/util/error"
import type { SystemError } from "bun"
import type { Provider } from "@/provider/provider"
import { ModelID, ProviderID } from "@/provider/schema"
import { Effect } from "effect"
/** Error shape thrown by Bun's fetch() when gzip/br decompression fails mid-stream */
interface FetchDecompressionError extends Error {
@@ -547,7 +548,7 @@ export namespace MessageV2 {
and(eq(MessageTable.time_created, row.time), lt(MessageTable.id, row.id)),
)
async function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
const ids = rows.map((row) => row.id)
const partByMessage = new Map<string, MessageV2.Part[]>()
if (ids.length > 0) {
@@ -809,48 +810,58 @@ export namespace MessageV2 {
)
}
export const toModelMessagesEffect = Effect.fnUntraced(function* (
input: WithParts[],
model: Provider.Model,
options?: { stripMedia?: boolean },
) {
return yield* Effect.promise(() => toModelMessages(input, model, options))
})
function pageSync(input: { sessionID: SessionID; limit: number; before?: string }) {
const before = input.before ? cursor.decode(input.before) : undefined
const where = before
? and(eq(MessageTable.session_id, input.sessionID), older(before))
: eq(MessageTable.session_id, input.sessionID)
const rows = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(where)
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
.limit(input.limit + 1)
.all(),
)
if (rows.length === 0) {
const row = Database.use((db) =>
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
)
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
return {
items: [] as MessageV2.WithParts[],
more: false,
}
}
const more = rows.length > input.limit
const slice = more ? rows.slice(0, input.limit) : rows
const items = hydrate(slice)
items.reverse()
const tail = slice.at(-1)
return {
items,
more,
cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined,
}
}
export const page = fn(
z.object({
sessionID: SessionID.zod,
limit: z.number().int().positive(),
before: z.string().optional(),
}),
async (input) => {
const before = input.before ? cursor.decode(input.before) : undefined
const where = before
? and(eq(MessageTable.session_id, input.sessionID), older(before))
: eq(MessageTable.session_id, input.sessionID)
const rows = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(where)
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
.limit(input.limit + 1)
.all(),
)
if (rows.length === 0) {
const row = Database.use((db) =>
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
)
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
return {
items: [] as MessageV2.WithParts[],
more: false,
}
}
const more = rows.length > input.limit
const page = more ? rows.slice(0, input.limit) : rows
const items = await hydrate(page)
items.reverse()
const tail = page.at(-1)
return {
items,
more,
cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined,
}
},
async (input) => pageSync(input),
)
export const stream = fn(SessionID.zod, async function* (sessionID) {
@@ -867,7 +878,7 @@ export namespace MessageV2 {
}
})
export const parts = fn(MessageID.zod, async (message_id) => {
function partsSync(message_id: MessageID) {
const rows = Database.use((db) =>
db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).orderBy(PartTable.id).all(),
)
@@ -880,29 +891,83 @@ export namespace MessageV2 {
messageID: row.message_id,
}) as MessageV2.Part,
)
})
}
export const parts = fn(MessageID.zod, async (message_id) => partsSync(message_id))
function getSync(input: { sessionID: SessionID; messageID: MessageID }): WithParts {
const row = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
.get(),
)
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
return {
info: info(row),
parts: partsSync(input.messageID),
}
}
export const get = fn(
z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
}),
async (input): Promise<WithParts> => {
const row = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
.get(),
)
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
return {
info: info(row),
parts: await parts(input.messageID),
}
},
async (input): Promise<WithParts> => getSync(input),
)
export const partsEffect = Effect.fnUntraced(function* (id: MessageID) {
return partsSync(id)
})
export const getEffect = Effect.fnUntraced(function* (input: { sessionID: SessionID; messageID: MessageID }) {
return getSync(input)
})
export const pageEffect = Effect.fnUntraced(function* (input: {
sessionID: SessionID
limit: number
before?: string
}) {
return pageSync(input)
})
export const streamEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
const result: WithParts[] = []
const size = 50
let before: string | undefined
while (true) {
const next = pageSync({ sessionID, limit: size, before })
if (next.items.length === 0) break
for (let i = next.items.length - 1; i >= 0; i--) {
result.push(next.items[i])
}
if (!next.more || !next.cursor) break
before = next.cursor
}
return result
})
function applyCompactionFilter(msgs: MessageV2.WithParts[]) {
const result = [] as MessageV2.WithParts[]
const completed = new Set<string>()
for (const msg of msgs) {
result.push(msg)
if (
msg.info.role === "user" &&
completed.has(msg.info.id) &&
msg.parts.some((part) => part.type === "compaction")
)
break
if (msg.info.role === "assistant" && msg.info.summary && msg.info.finish && !msg.info.error)
completed.add(msg.info.parentID)
}
result.reverse()
return result
}
export async function filterCompacted(stream: AsyncIterable<MessageV2.WithParts>) {
const result = [] as MessageV2.WithParts[]
const completed = new Set<string>()
@@ -921,6 +986,10 @@ export namespace MessageV2 {
return result
}
export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
return applyCompactionFilter(yield* streamEffect(sessionID))
})
export function fromError(
e: unknown,
ctx: { providerID: ProviderID; aborted?: boolean },

View File

@@ -180,7 +180,7 @@ export namespace SessionProcessor {
metadata: value.providerMetadata,
} satisfies MessageV2.ToolPart)
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
const parts = yield* MessageV2.partsEffect(ctx.assistantMessage.id)
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
if (
@@ -392,7 +392,7 @@ export namespace SessionProcessor {
}
ctx.reasoningMap = {}
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
const parts = yield* MessageV2.partsEffect(ctx.assistantMessage.id)
for (const part of parts) {
if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue
yield* session.updatePart({

View File

@@ -213,7 +213,7 @@ export namespace SessionPrompt {
(yield* provider.getModel(input.providerID, input.modelID)))
const msgs = onlySubtasks
? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }]
: yield* Effect.promise(() => MessageV2.toModelMessages(context, mdl))
: yield* MessageV2.toModelMessagesEffect(context, mdl)
const text = yield* Effect.promise(async (signal) => {
const result = await LLM.stream({
agent: ag,
@@ -1360,7 +1360,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
yield* status.set(sessionID, { type: "busy" })
log.info("loop", { step, sessionID })
let msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(sessionID)))
let msgs = yield* MessageV2.filterCompactedEffect(sessionID)
let lastUser: MessageV2.User | undefined
let lastAssistant: MessageV2.Assistant | undefined

View File

@@ -207,7 +207,7 @@ it.live("session.processor effect tests capture llm input cleanly", () =>
} satisfies LLM.StreamInput
const value = yield* handle.process(input)
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = yield* MessageV2.partsEffect(msg.id)
const calls = yield* llm.calls
expect(value).toBe("continue")
@@ -254,7 +254,7 @@ it.live("session.processor effect tests stop after token overflow requests compa
tools: {},
})
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = yield* MessageV2.partsEffect(msg.id)
expect(value).toBe("compact")
expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
@@ -347,7 +347,7 @@ it.live("session.processor effect tests reset reasoning state across retries", (
tools: {},
})
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = yield* MessageV2.partsEffect(msg.id)
const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
expect(value).toBe("continue")
@@ -438,7 +438,7 @@ it.live("session.processor effect tests retry recognized structured json errors"
tools: {},
})
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = yield* MessageV2.partsEffect(msg.id)
expect(value).toBe("continue")
expect(yield* llm.calls).toBe(2)
@@ -596,7 +596,7 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
yield* handle.abort()
}
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = yield* MessageV2.partsEffect(msg.id)
const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
expect(Exit.isFailure(exit)).toBe(true)
@@ -669,7 +669,7 @@ it.live("session.processor effect tests record aborted errors and idle state", (
yield* handle.abort()
}
yield* Effect.promise(() => seen.promise)
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
const stored = yield* MessageV2.getEffect({ sessionID: chat.id, messageID: msg.id })
const state = yield* sts.get(chat.id)
off()
@@ -731,7 +731,7 @@ it.live("session.processor effect tests mark interruptions aborted without manua
yield* Fiber.interrupt(run)
const exit = yield* Fiber.await(run)
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
const stored = yield* MessageV2.getEffect({ sessionID: chat.id, messageID: msg.id })
const state = yield* sts.get(chat.id)
expect(Exit.isFailure(exit)).toBe(true)

View File

@@ -469,7 +469,7 @@ it.live("failed subtask preserves metadata on error tool state", () =>
expect(result.info.role).toBe("assistant")
expect(yield* llm.calls).toBe(2)
const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
expect(taskMsg?.info.role).toBe("assistant")
if (!taskMsg || taskMsg.info.role !== "assistant") return
@@ -628,7 +628,7 @@ it.live(
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
expect(taskMsg?.info.role).toBe("assistant")
if (!taskMsg || taskMsg.info.role !== "assistant") return