mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-01 11:34:34 +00:00
Compare commits
1 Commits
worktree-c
...
worktree-e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af3f77f50d |
@@ -218,7 +218,7 @@ When constructing the summary, try to stick to this template:
|
||||
const prompt = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n")
|
||||
const msgs = structuredClone(messages)
|
||||
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
|
||||
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
|
||||
const modelMessages = yield* MessageV2.toModelMessagesEffect(msgs, model, { stripMedia: true })
|
||||
const ctx = yield* InstanceState.context
|
||||
const msg: MessageV2.Assistant = {
|
||||
id: MessageID.ascending(),
|
||||
|
||||
@@ -593,15 +593,13 @@ export namespace Session {
|
||||
})
|
||||
|
||||
const messages = Effect.fn("Session.messages")(function* (input: { sessionID: SessionID; limit?: number }) {
|
||||
return yield* Effect.promise(async () => {
|
||||
const result = [] as MessageV2.WithParts[]
|
||||
for await (const msg of MessageV2.stream(input.sessionID)) {
|
||||
if (input.limit && result.length >= input.limit) break
|
||||
result.push(msg)
|
||||
}
|
||||
result.reverse()
|
||||
return result
|
||||
})
|
||||
if (input.limit) {
|
||||
const result = yield* MessageV2.pageEffect({ sessionID: input.sessionID, limit: input.limit })
|
||||
return result.items
|
||||
}
|
||||
const all = yield* MessageV2.streamEffect(input.sessionID)
|
||||
all.reverse()
|
||||
return all
|
||||
})
|
||||
|
||||
const removeMessage = Effect.fn("Session.removeMessage")(function* (input: {
|
||||
|
||||
@@ -15,6 +15,7 @@ import { errorMessage } from "@/util/error"
|
||||
import type { SystemError } from "bun"
|
||||
import type { Provider } from "@/provider/provider"
|
||||
import { ModelID, ProviderID } from "@/provider/schema"
|
||||
import { Effect } from "effect"
|
||||
|
||||
/** Error shape thrown by Bun's fetch() when gzip/br decompression fails mid-stream */
|
||||
interface FetchDecompressionError extends Error {
|
||||
@@ -547,7 +548,7 @@ export namespace MessageV2 {
|
||||
and(eq(MessageTable.time_created, row.time), lt(MessageTable.id, row.id)),
|
||||
)
|
||||
|
||||
async function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
|
||||
function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
|
||||
const ids = rows.map((row) => row.id)
|
||||
const partByMessage = new Map<string, MessageV2.Part[]>()
|
||||
if (ids.length > 0) {
|
||||
@@ -809,48 +810,58 @@ export namespace MessageV2 {
|
||||
)
|
||||
}
|
||||
|
||||
export const toModelMessagesEffect = Effect.fnUntraced(function* (
|
||||
input: WithParts[],
|
||||
model: Provider.Model,
|
||||
options?: { stripMedia?: boolean },
|
||||
) {
|
||||
return yield* Effect.promise(() => toModelMessages(input, model, options))
|
||||
})
|
||||
|
||||
function pageSync(input: { sessionID: SessionID; limit: number; before?: string }) {
|
||||
const before = input.before ? cursor.decode(input.before) : undefined
|
||||
const where = before
|
||||
? and(eq(MessageTable.session_id, input.sessionID), older(before))
|
||||
: eq(MessageTable.session_id, input.sessionID)
|
||||
const rows = Database.use((db) =>
|
||||
db
|
||||
.select()
|
||||
.from(MessageTable)
|
||||
.where(where)
|
||||
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
|
||||
.limit(input.limit + 1)
|
||||
.all(),
|
||||
)
|
||||
if (rows.length === 0) {
|
||||
const row = Database.use((db) =>
|
||||
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
|
||||
)
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
|
||||
return {
|
||||
items: [] as MessageV2.WithParts[],
|
||||
more: false,
|
||||
}
|
||||
}
|
||||
|
||||
const more = rows.length > input.limit
|
||||
const slice = more ? rows.slice(0, input.limit) : rows
|
||||
const items = hydrate(slice)
|
||||
items.reverse()
|
||||
const tail = slice.at(-1)
|
||||
return {
|
||||
items,
|
||||
more,
|
||||
cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined,
|
||||
}
|
||||
}
|
||||
|
||||
export const page = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
limit: z.number().int().positive(),
|
||||
before: z.string().optional(),
|
||||
}),
|
||||
async (input) => {
|
||||
const before = input.before ? cursor.decode(input.before) : undefined
|
||||
const where = before
|
||||
? and(eq(MessageTable.session_id, input.sessionID), older(before))
|
||||
: eq(MessageTable.session_id, input.sessionID)
|
||||
const rows = Database.use((db) =>
|
||||
db
|
||||
.select()
|
||||
.from(MessageTable)
|
||||
.where(where)
|
||||
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
|
||||
.limit(input.limit + 1)
|
||||
.all(),
|
||||
)
|
||||
if (rows.length === 0) {
|
||||
const row = Database.use((db) =>
|
||||
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
|
||||
)
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
|
||||
return {
|
||||
items: [] as MessageV2.WithParts[],
|
||||
more: false,
|
||||
}
|
||||
}
|
||||
|
||||
const more = rows.length > input.limit
|
||||
const page = more ? rows.slice(0, input.limit) : rows
|
||||
const items = await hydrate(page)
|
||||
items.reverse()
|
||||
const tail = page.at(-1)
|
||||
return {
|
||||
items,
|
||||
more,
|
||||
cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined,
|
||||
}
|
||||
},
|
||||
async (input) => pageSync(input),
|
||||
)
|
||||
|
||||
export const stream = fn(SessionID.zod, async function* (sessionID) {
|
||||
@@ -867,7 +878,7 @@ export namespace MessageV2 {
|
||||
}
|
||||
})
|
||||
|
||||
export const parts = fn(MessageID.zod, async (message_id) => {
|
||||
function partsSync(message_id: MessageID) {
|
||||
const rows = Database.use((db) =>
|
||||
db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).orderBy(PartTable.id).all(),
|
||||
)
|
||||
@@ -880,29 +891,83 @@ export namespace MessageV2 {
|
||||
messageID: row.message_id,
|
||||
}) as MessageV2.Part,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
export const parts = fn(MessageID.zod, async (message_id) => partsSync(message_id))
|
||||
|
||||
function getSync(input: { sessionID: SessionID; messageID: MessageID }): WithParts {
|
||||
const row = Database.use((db) =>
|
||||
db
|
||||
.select()
|
||||
.from(MessageTable)
|
||||
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
|
||||
.get(),
|
||||
)
|
||||
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
|
||||
return {
|
||||
info: info(row),
|
||||
parts: partsSync(input.messageID),
|
||||
}
|
||||
}
|
||||
|
||||
export const get = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
messageID: MessageID.zod,
|
||||
}),
|
||||
async (input): Promise<WithParts> => {
|
||||
const row = Database.use((db) =>
|
||||
db
|
||||
.select()
|
||||
.from(MessageTable)
|
||||
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
|
||||
.get(),
|
||||
)
|
||||
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
|
||||
return {
|
||||
info: info(row),
|
||||
parts: await parts(input.messageID),
|
||||
}
|
||||
},
|
||||
async (input): Promise<WithParts> => getSync(input),
|
||||
)
|
||||
|
||||
export const partsEffect = Effect.fnUntraced(function* (id: MessageID) {
|
||||
return partsSync(id)
|
||||
})
|
||||
|
||||
export const getEffect = Effect.fnUntraced(function* (input: { sessionID: SessionID; messageID: MessageID }) {
|
||||
return getSync(input)
|
||||
})
|
||||
|
||||
export const pageEffect = Effect.fnUntraced(function* (input: {
|
||||
sessionID: SessionID
|
||||
limit: number
|
||||
before?: string
|
||||
}) {
|
||||
return pageSync(input)
|
||||
})
|
||||
|
||||
export const streamEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
|
||||
const result: WithParts[] = []
|
||||
const size = 50
|
||||
let before: string | undefined
|
||||
while (true) {
|
||||
const next = pageSync({ sessionID, limit: size, before })
|
||||
if (next.items.length === 0) break
|
||||
for (let i = next.items.length - 1; i >= 0; i--) {
|
||||
result.push(next.items[i])
|
||||
}
|
||||
if (!next.more || !next.cursor) break
|
||||
before = next.cursor
|
||||
}
|
||||
return result
|
||||
})
|
||||
|
||||
function applyCompactionFilter(msgs: MessageV2.WithParts[]) {
|
||||
const result = [] as MessageV2.WithParts[]
|
||||
const completed = new Set<string>()
|
||||
for (const msg of msgs) {
|
||||
result.push(msg)
|
||||
if (
|
||||
msg.info.role === "user" &&
|
||||
completed.has(msg.info.id) &&
|
||||
msg.parts.some((part) => part.type === "compaction")
|
||||
)
|
||||
break
|
||||
if (msg.info.role === "assistant" && msg.info.summary && msg.info.finish && !msg.info.error)
|
||||
completed.add(msg.info.parentID)
|
||||
}
|
||||
result.reverse()
|
||||
return result
|
||||
}
|
||||
|
||||
export async function filterCompacted(stream: AsyncIterable<MessageV2.WithParts>) {
|
||||
const result = [] as MessageV2.WithParts[]
|
||||
const completed = new Set<string>()
|
||||
@@ -921,6 +986,10 @@ export namespace MessageV2 {
|
||||
return result
|
||||
}
|
||||
|
||||
export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
|
||||
return applyCompactionFilter(yield* streamEffect(sessionID))
|
||||
})
|
||||
|
||||
export function fromError(
|
||||
e: unknown,
|
||||
ctx: { providerID: ProviderID; aborted?: boolean },
|
||||
|
||||
@@ -180,7 +180,7 @@ export namespace SessionProcessor {
|
||||
metadata: value.providerMetadata,
|
||||
} satisfies MessageV2.ToolPart)
|
||||
|
||||
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
|
||||
const parts = yield* MessageV2.partsEffect(ctx.assistantMessage.id)
|
||||
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
|
||||
|
||||
if (
|
||||
@@ -392,7 +392,7 @@ export namespace SessionProcessor {
|
||||
}
|
||||
ctx.reasoningMap = {}
|
||||
|
||||
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
|
||||
const parts = yield* MessageV2.partsEffect(ctx.assistantMessage.id)
|
||||
for (const part of parts) {
|
||||
if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue
|
||||
yield* session.updatePart({
|
||||
|
||||
@@ -213,7 +213,7 @@ export namespace SessionPrompt {
|
||||
(yield* provider.getModel(input.providerID, input.modelID)))
|
||||
const msgs = onlySubtasks
|
||||
? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }]
|
||||
: yield* Effect.promise(() => MessageV2.toModelMessages(context, mdl))
|
||||
: yield* MessageV2.toModelMessagesEffect(context, mdl)
|
||||
const text = yield* Effect.promise(async (signal) => {
|
||||
const result = await LLM.stream({
|
||||
agent: ag,
|
||||
@@ -1360,7 +1360,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
yield* status.set(sessionID, { type: "busy" })
|
||||
log.info("loop", { step, sessionID })
|
||||
|
||||
let msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(sessionID)))
|
||||
let msgs = yield* MessageV2.filterCompactedEffect(sessionID)
|
||||
|
||||
let lastUser: MessageV2.User | undefined
|
||||
let lastAssistant: MessageV2.Assistant | undefined
|
||||
|
||||
@@ -207,7 +207,7 @@ it.live("session.processor effect tests capture llm input cleanly", () =>
|
||||
} satisfies LLM.StreamInput
|
||||
|
||||
const value = yield* handle.process(input)
|
||||
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
|
||||
const parts = yield* MessageV2.partsEffect(msg.id)
|
||||
const calls = yield* llm.calls
|
||||
|
||||
expect(value).toBe("continue")
|
||||
@@ -254,7 +254,7 @@ it.live("session.processor effect tests stop after token overflow requests compa
|
||||
tools: {},
|
||||
})
|
||||
|
||||
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
|
||||
const parts = yield* MessageV2.partsEffect(msg.id)
|
||||
|
||||
expect(value).toBe("compact")
|
||||
expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
|
||||
@@ -347,7 +347,7 @@ it.live("session.processor effect tests reset reasoning state across retries", (
|
||||
tools: {},
|
||||
})
|
||||
|
||||
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
|
||||
const parts = yield* MessageV2.partsEffect(msg.id)
|
||||
const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
|
||||
|
||||
expect(value).toBe("continue")
|
||||
@@ -438,7 +438,7 @@ it.live("session.processor effect tests retry recognized structured json errors"
|
||||
tools: {},
|
||||
})
|
||||
|
||||
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
|
||||
const parts = yield* MessageV2.partsEffect(msg.id)
|
||||
|
||||
expect(value).toBe("continue")
|
||||
expect(yield* llm.calls).toBe(2)
|
||||
@@ -596,7 +596,7 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
|
||||
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
|
||||
yield* handle.abort()
|
||||
}
|
||||
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
|
||||
const parts = yield* MessageV2.partsEffect(msg.id)
|
||||
const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
|
||||
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
@@ -669,7 +669,7 @@ it.live("session.processor effect tests record aborted errors and idle state", (
|
||||
yield* handle.abort()
|
||||
}
|
||||
yield* Effect.promise(() => seen.promise)
|
||||
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
|
||||
const stored = yield* MessageV2.getEffect({ sessionID: chat.id, messageID: msg.id })
|
||||
const state = yield* sts.get(chat.id)
|
||||
off()
|
||||
|
||||
@@ -731,7 +731,7 @@ it.live("session.processor effect tests mark interruptions aborted without manua
|
||||
yield* Fiber.interrupt(run)
|
||||
|
||||
const exit = yield* Fiber.await(run)
|
||||
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
|
||||
const stored = yield* MessageV2.getEffect({ sessionID: chat.id, messageID: msg.id })
|
||||
const state = yield* sts.get(chat.id)
|
||||
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
|
||||
@@ -469,7 +469,7 @@ it.live("failed subtask preserves metadata on error tool state", () =>
|
||||
expect(result.info.role).toBe("assistant")
|
||||
expect(yield* llm.calls).toBe(2)
|
||||
|
||||
const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
|
||||
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
|
||||
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
|
||||
expect(taskMsg?.info.role).toBe("assistant")
|
||||
if (!taskMsg || taskMsg.info.role !== "assistant") return
|
||||
@@ -628,7 +628,7 @@ it.live(
|
||||
const exit = yield* Fiber.await(fiber)
|
||||
expect(Exit.isSuccess(exit)).toBe(true)
|
||||
|
||||
const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
|
||||
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
|
||||
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
|
||||
expect(taskMsg?.info.role).toBe("assistant")
|
||||
if (!taskMsg || taskMsg.info.role !== "assistant") return
|
||||
|
||||
Reference in New Issue
Block a user