diff --git a/packages/opencode/src/cli/cmd/stats.ts b/packages/opencode/src/cli/cmd/stats.ts index 3dadea9dd0..7dc1822605 100644 --- a/packages/opencode/src/cli/cmd/stats.ts +++ b/packages/opencode/src/cli/cmd/stats.ts @@ -1,6 +1,7 @@ import { Effect } from "effect" import { effectCmd } from "../effect-cmd" import { Session } from "@/session/session" +import { NotFoundError } from "@/storage/storage" import { Database } from "@/storage/db" import { SessionTable } from "../../session/session.sql" import { Project } from "@/project/project" @@ -162,7 +163,9 @@ const aggregateSessionStats = Effect.fn("Cli.stats.aggregate")(function* ( filteredSessions, (session) => Effect.gen(function* () { - const messages = yield* svc.messages({ sessionID: session.id }) + const messages = yield* svc.messages({ sessionID: session.id }).pipe( + Effect.catchIf(NotFoundError.isInstance, () => Effect.succeed([])), + ) const sessionCost = session.cost ?? 0 const sessionTokens = session.tokens ?? { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } } diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/session.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/session.ts index 80bff42bc1..ebd0fcd4d8 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/handlers/session.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/session.ts @@ -101,7 +101,7 @@ export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "session", } yield* SessionError.mapStorageNotFound(session.get(ctx.params.sessionID)) if (ctx.query.limit === undefined || ctx.query.limit === 0) { - return yield* session.messages({ sessionID: ctx.params.sessionID }) + return yield* SessionError.mapStorageNotFound(session.messages({ sessionID: ctx.params.sessionID })) } const page = yield* SessionError.mapStorageNotFound( @@ -250,7 +250,7 @@ export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "session", payload: typeof SummarizePayload.Type }) { yield* revertSvc.cleanup(yield* SessionError.mapStorageNotFound(session.get(ctx.params.sessionID))) - const messages = yield* session.messages({ sessionID: ctx.params.sessionID }) + const messages = yield* SessionError.mapStorageNotFound(session.messages({ sessionID: ctx.params.sessionID })) const defaultAgent = yield* agentSvc.defaultAgent() const currentAgent = messages.findLast((message) => message.info.role === "user")?.info.agent ?? defaultAgent diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 4eafbdf749..f3c160fe73 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -565,7 +565,9 @@ export const layer: Layer.Layer< if (processor.message.error) return "stop" if (result === "continue") { const summary = summaryText( - (yield* session.messages({ sessionID: input.sessionID })).find((item) => item.info.id === msg.id) ?? { + (yield* session.messages({ sessionID: input.sessionID }).pipe(Effect.orDie)).find( + (item) => item.info.id === msg.id, + ) ?? { info: msg, parts: [], }, diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index b89561d5d1..8656050faf 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -1077,7 +1077,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the ...(current.model.variant && current.model.variant !== "default" ? { variant: current.model.variant } : {}), } } - const match = yield* sessions.findMessage(sessionID, (m) => m.info.role === "user" && !!m.info.model) + const match = yield* sessions + .findMessage(sessionID, (m) => m.info.role === "user" && !!m.info.model) + .pipe(Effect.orDie) if (Option.isSome(match) && match.value.info.role === "user") return match.value.info.model return yield* provider.defaultModel() }) @@ -1615,9 +1617,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the ) const lastAssistant = Effect.fnUntraced(function* (sessionID: SessionID) { - const match = yield* sessions.findMessage(sessionID, (m) => m.info.role !== "user") + const match = yield* sessions.findMessage(sessionID, (m) => m.info.role !== "user").pipe(Effect.orDie) if (Option.isSome(match)) return match.value - const msgs = yield* sessions.messages({ sessionID, limit: 1 }) + const msgs = yield* sessions.messages({ sessionID, limit: 1 }).pipe(Effect.orDie) if (msgs.length > 0) return msgs[0] throw new Error("Impossible") }) diff --git a/packages/opencode/src/session/revert.ts b/packages/opencode/src/session/revert.ts index ef9089c949..8ba7b265b9 100644 --- a/packages/opencode/src/session/revert.ts +++ b/packages/opencode/src/session/revert.ts @@ -40,7 +40,7 @@ export const layer = Layer.effect( const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) { yield* state.assertNotBusy(input.sessionID) - const all = yield* sessions.messages({ sessionID: input.sessionID }) + const all = yield* sessions.messages({ sessionID: input.sessionID }).pipe(Effect.orDie) let lastUser: MessageV2.User | undefined const session = yield* sessions.get(input.sessionID).pipe(Effect.orDie) @@ -103,7 +103,7 @@ export const layer = Layer.effect( const cleanup = Effect.fn("SessionRevert.cleanup")(function* (session: Session.Info) { if (!session.revert) return const sessionID = session.id - const msgs = yield* sessions.messages({ sessionID }) + const msgs = yield* sessions.messages({ sessionID }).pipe(Effect.orDie) const messageID = session.revert.messageID const remove = [] as MessageV2.WithParts[] let target: MessageV2.WithParts | undefined diff --git a/packages/opencode/src/session/session.ts b/packages/opencode/src/session/session.ts index 0913b85687..d82b2369e1 100644 --- a/packages/opencode/src/session/session.ts +++ b/packages/opencode/src/session/session.ts @@ -474,7 +474,7 @@ export interface Interface { readonly clearRevert: (sessionID: SessionID) => Effect.Effect readonly setSummary: (input: { sessionID: SessionID; summary: Info["summary"] }) => Effect.Effect readonly diff: (sessionID: SessionID) => Effect.Effect - readonly messages: (input: { sessionID: SessionID; limit?: number }) => Effect.Effect + readonly messages: (input: { sessionID: SessionID; limit?: number }) => Effect.Effect readonly children: (parentID: SessionID) => Effect.Effect readonly remove: (sessionID: SessionID) => Effect.Effect readonly updateMessage: (msg: T) => Effect.Effect @@ -497,7 +497,7 @@ export interface Interface { readonly findMessage: ( sessionID: SessionID, predicate: (msg: MessageV2.WithParts) => boolean, - ) => Effect.Effect> + ) => Effect.Effect, NotFound> } export class Service extends Context.Service()("@opencode/Session") {} @@ -757,11 +757,25 @@ export const layer: Layer.Layer [])) }) - const messages = Effect.fn("Session.messages")(function* (input: { sessionID: SessionID; limit?: number }) { + const messages: Interface["messages"] = Effect.fn("Session.messages")(function* (input) { if (input.limit) { - return MessageV2.page({ sessionID: input.sessionID, limit: input.limit }).items + return (yield* MessageV2.pageEffect({ sessionID: input.sessionID, limit: input.limit })).items } - return Array.from(MessageV2.stream(input.sessionID)).reverse() + + const size = 50 + const result = [] as MessageV2.WithParts[] + let before: string | undefined + while (true) { + const page = yield* MessageV2.pageEffect({ sessionID: input.sessionID, limit: size, before }) + if (page.items.length === 0) break + for (let i = page.items.length - 1; i >= 0; i--) { + const item = page.items[i] + if (item) result.push(item) + } + if (!page.more || !page.cursor) break + before = page.cursor + } + return result.reverse() }) const removeMessage = Effect.fn("Session.removeMessage")(function* (input: { @@ -799,12 +813,18 @@ export const layer: Layer.Layer boolean, - ) { - for (const item of MessageV2.stream(sessionID)) { - if (predicate(item)) return Option.some(item) + const findMessage: Interface["findMessage"] = Effect.fn("Session.findMessage")(function* (sessionID, predicate) { + const size = 50 + let before: string | undefined + while (true) { + const page = yield* MessageV2.pageEffect({ sessionID, limit: size, before }) + if (page.items.length === 0) break + for (let i = page.items.length - 1; i >= 0; i--) { + const item = page.items[i] + if (item && predicate(item)) return Option.some(item) + } + if (!page.more || !page.cursor) break + before = page.cursor } return Option.none() }) diff --git a/packages/opencode/src/session/summary.ts b/packages/opencode/src/session/summary.ts index c03fc32695..aa4b8719bc 100644 --- a/packages/opencode/src/session/summary.ts +++ b/packages/opencode/src/session/summary.ts @@ -102,7 +102,7 @@ export const layer = Layer.effect( sessionID: SessionID messageID: MessageID }) { - const all = yield* sessions.messages({ sessionID: input.sessionID }) + const all = yield* sessions.messages({ sessionID: input.sessionID }).pipe(Effect.orDie) if (!all.length) return const diffs = yield* computeDiff({ messages: all }) diff --git a/packages/opencode/test/server/httpapi-exercise/runner.ts b/packages/opencode/test/server/httpapi-exercise/runner.ts index bc246dbeda..7ab5ccf990 100644 --- a/packages/opencode/test/server/httpapi-exercise/runner.ts +++ b/packages/opencode/test/server/httpapi-exercise/runner.ts @@ -168,7 +168,8 @@ function withContext( ) return { info, part } }), - messages: (sessionID) => run(modules.Session.Service.use((svc) => svc.messages({ sessionID }))), + messages: (sessionID) => + run(modules.Session.Service.use((svc) => svc.messages({ sessionID }).pipe(Effect.orDie))), todos: (sessionID, todos) => run(modules.Todo.Service.use((svc) => svc.update({ sessionID, todos }))), worktree: (input) => run(modules.Worktree.Service.use((svc) => svc.create(input))), worktreeRemove: (directory) => diff --git a/packages/opencode/test/session/messages-pagination.test.ts b/packages/opencode/test/session/messages-pagination.test.ts index 8332bb9b3d..247f95d280 100644 --- a/packages/opencode/test/session/messages-pagination.test.ts +++ b/packages/opencode/test/session/messages-pagination.test.ts @@ -1,5 +1,5 @@ import { describe, expect, test } from "bun:test" -import { Effect } from "effect" +import { Effect, Option } from "effect" import { Session as SessionNs } from "@/session/session" import { MessageV2 } from "../../src/session/message-v2" import { MessageID, PartID, type SessionID } from "../../src/session/schema" @@ -582,6 +582,50 @@ describe("MessageV2.get", () => { ) }) +describe("Session.messages", () => { + it.instance("returns all messages in chronological order across pages", () => + withSession(({ session, sessionID }) => + Effect.gen(function* () { + const ids = yield* fill(sessionID, 55) + const result = yield* session.messages({ sessionID }) + expect(result.map((item) => item.info.id)).toEqual(ids) + }), + ), + ) + + it.instance("fails with NotFoundError for non-existent session", () => + Effect.gen(function* () { + const session = yield* SessionNs.Service + const fake = "non-existent-session" as SessionID + const error = yield* Effect.flip(session.messages({ sessionID: fake })) + expect(error).toBeInstanceOf(NotFoundError) + expect(error.message).toBe(`Session not found: ${fake}`) + }), + ) +}) + +describe("Session.findMessage", () => { + it.instance("searches newest-first", () => + withSession(({ session, sessionID }) => + Effect.gen(function* () { + const ids = yield* fill(sessionID, 3) + const result = yield* session.findMessage(sessionID, () => true) + expect(Option.isSome(result) ? result.value.info.id : undefined).toBe(ids.at(-1)) + }), + ), + ) + + it.instance("fails with NotFoundError for non-existent session", () => + Effect.gen(function* () { + const session = yield* SessionNs.Service + const fake = "non-existent-session" as SessionID + const error = yield* Effect.flip(session.findMessage(fake, () => true)) + expect(error).toBeInstanceOf(NotFoundError) + expect(error.message).toBe(`Session not found: ${fake}`) + }), + ) +}) + describe("MessageV2.filterCompacted", () => { it.instance("returns all messages when no compaction", () => withSession(({ sessionID }) =>