mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-14 08:32:33 +00:00
fix(session): type message list not found errors (#27275)
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import { Effect } from "effect"
|
||||
import { effectCmd } from "../effect-cmd"
|
||||
import { Session } from "@/session/session"
|
||||
import { NotFoundError } from "@/storage/storage"
|
||||
import { Database } from "@/storage/db"
|
||||
import { SessionTable } from "../../session/session.sql"
|
||||
import { Project } from "@/project/project"
|
||||
@@ -162,7 +163,9 @@ const aggregateSessionStats = Effect.fn("Cli.stats.aggregate")(function* (
|
||||
filteredSessions,
|
||||
(session) =>
|
||||
Effect.gen(function* () {
|
||||
const messages = yield* svc.messages({ sessionID: session.id })
|
||||
const messages = yield* svc.messages({ sessionID: session.id }).pipe(
|
||||
Effect.catchIf(NotFoundError.isInstance, () => Effect.succeed([])),
|
||||
)
|
||||
|
||||
const sessionCost = session.cost ?? 0
|
||||
const sessionTokens = session.tokens ?? { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }
|
||||
|
||||
@@ -101,7 +101,7 @@ export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "session",
|
||||
}
|
||||
yield* SessionError.mapStorageNotFound(session.get(ctx.params.sessionID))
|
||||
if (ctx.query.limit === undefined || ctx.query.limit === 0) {
|
||||
return yield* session.messages({ sessionID: ctx.params.sessionID })
|
||||
return yield* SessionError.mapStorageNotFound(session.messages({ sessionID: ctx.params.sessionID }))
|
||||
}
|
||||
|
||||
const page = yield* SessionError.mapStorageNotFound(
|
||||
@@ -250,7 +250,7 @@ export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "session",
|
||||
payload: typeof SummarizePayload.Type
|
||||
}) {
|
||||
yield* revertSvc.cleanup(yield* SessionError.mapStorageNotFound(session.get(ctx.params.sessionID)))
|
||||
const messages = yield* session.messages({ sessionID: ctx.params.sessionID })
|
||||
const messages = yield* SessionError.mapStorageNotFound(session.messages({ sessionID: ctx.params.sessionID }))
|
||||
const defaultAgent = yield* agentSvc.defaultAgent()
|
||||
const currentAgent = messages.findLast((message) => message.info.role === "user")?.info.agent ?? defaultAgent
|
||||
|
||||
|
||||
@@ -565,7 +565,9 @@ export const layer: Layer.Layer<
|
||||
if (processor.message.error) return "stop"
|
||||
if (result === "continue") {
|
||||
const summary = summaryText(
|
||||
(yield* session.messages({ sessionID: input.sessionID })).find((item) => item.info.id === msg.id) ?? {
|
||||
(yield* session.messages({ sessionID: input.sessionID }).pipe(Effect.orDie)).find(
|
||||
(item) => item.info.id === msg.id,
|
||||
) ?? {
|
||||
info: msg,
|
||||
parts: [],
|
||||
},
|
||||
|
||||
@@ -1077,7 +1077,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
...(current.model.variant && current.model.variant !== "default" ? { variant: current.model.variant } : {}),
|
||||
}
|
||||
}
|
||||
const match = yield* sessions.findMessage(sessionID, (m) => m.info.role === "user" && !!m.info.model)
|
||||
const match = yield* sessions
|
||||
.findMessage(sessionID, (m) => m.info.role === "user" && !!m.info.model)
|
||||
.pipe(Effect.orDie)
|
||||
if (Option.isSome(match) && match.value.info.role === "user") return match.value.info.model
|
||||
return yield* provider.defaultModel()
|
||||
})
|
||||
@@ -1615,9 +1617,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
)
|
||||
|
||||
const lastAssistant = Effect.fnUntraced(function* (sessionID: SessionID) {
|
||||
const match = yield* sessions.findMessage(sessionID, (m) => m.info.role !== "user")
|
||||
const match = yield* sessions.findMessage(sessionID, (m) => m.info.role !== "user").pipe(Effect.orDie)
|
||||
if (Option.isSome(match)) return match.value
|
||||
const msgs = yield* sessions.messages({ sessionID, limit: 1 })
|
||||
const msgs = yield* sessions.messages({ sessionID, limit: 1 }).pipe(Effect.orDie)
|
||||
if (msgs.length > 0) return msgs[0]
|
||||
throw new Error("Impossible")
|
||||
})
|
||||
|
||||
@@ -40,7 +40,7 @@ export const layer = Layer.effect(
|
||||
|
||||
const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) {
|
||||
yield* state.assertNotBusy(input.sessionID)
|
||||
const all = yield* sessions.messages({ sessionID: input.sessionID })
|
||||
const all = yield* sessions.messages({ sessionID: input.sessionID }).pipe(Effect.orDie)
|
||||
let lastUser: MessageV2.User | undefined
|
||||
const session = yield* sessions.get(input.sessionID).pipe(Effect.orDie)
|
||||
|
||||
@@ -103,7 +103,7 @@ export const layer = Layer.effect(
|
||||
const cleanup = Effect.fn("SessionRevert.cleanup")(function* (session: Session.Info) {
|
||||
if (!session.revert) return
|
||||
const sessionID = session.id
|
||||
const msgs = yield* sessions.messages({ sessionID })
|
||||
const msgs = yield* sessions.messages({ sessionID }).pipe(Effect.orDie)
|
||||
const messageID = session.revert.messageID
|
||||
const remove = [] as MessageV2.WithParts[]
|
||||
let target: MessageV2.WithParts | undefined
|
||||
|
||||
@@ -474,7 +474,7 @@ export interface Interface {
|
||||
readonly clearRevert: (sessionID: SessionID) => Effect.Effect<void>
|
||||
readonly setSummary: (input: { sessionID: SessionID; summary: Info["summary"] }) => Effect.Effect<void>
|
||||
readonly diff: (sessionID: SessionID) => Effect.Effect<Snapshot.FileDiff[]>
|
||||
readonly messages: (input: { sessionID: SessionID; limit?: number }) => Effect.Effect<MessageV2.WithParts[]>
|
||||
readonly messages: (input: { sessionID: SessionID; limit?: number }) => Effect.Effect<MessageV2.WithParts[], NotFound>
|
||||
readonly children: (parentID: SessionID) => Effect.Effect<Info[]>
|
||||
readonly remove: (sessionID: SessionID) => Effect.Effect<void, NotFound>
|
||||
readonly updateMessage: <T extends MessageV2.Info>(msg: T) => Effect.Effect<T>
|
||||
@@ -497,7 +497,7 @@ export interface Interface {
|
||||
readonly findMessage: (
|
||||
sessionID: SessionID,
|
||||
predicate: (msg: MessageV2.WithParts) => boolean,
|
||||
) => Effect.Effect<Option.Option<MessageV2.WithParts>>
|
||||
) => Effect.Effect<Option.Option<MessageV2.WithParts>, NotFound>
|
||||
}
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/Session") {}
|
||||
@@ -757,11 +757,25 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service |
|
||||
.pipe(Effect.orElseSucceed((): Snapshot.FileDiff[] => []))
|
||||
})
|
||||
|
||||
const messages = Effect.fn("Session.messages")(function* (input: { sessionID: SessionID; limit?: number }) {
|
||||
const messages: Interface["messages"] = Effect.fn("Session.messages")(function* (input) {
|
||||
if (input.limit) {
|
||||
return MessageV2.page({ sessionID: input.sessionID, limit: input.limit }).items
|
||||
return (yield* MessageV2.pageEffect({ sessionID: input.sessionID, limit: input.limit })).items
|
||||
}
|
||||
return Array.from(MessageV2.stream(input.sessionID)).reverse()
|
||||
|
||||
const size = 50
|
||||
const result = [] as MessageV2.WithParts[]
|
||||
let before: string | undefined
|
||||
while (true) {
|
||||
const page = yield* MessageV2.pageEffect({ sessionID: input.sessionID, limit: size, before })
|
||||
if (page.items.length === 0) break
|
||||
for (let i = page.items.length - 1; i >= 0; i--) {
|
||||
const item = page.items[i]
|
||||
if (item) result.push(item)
|
||||
}
|
||||
if (!page.more || !page.cursor) break
|
||||
before = page.cursor
|
||||
}
|
||||
return result.reverse()
|
||||
})
|
||||
|
||||
const removeMessage = Effect.fn("Session.removeMessage")(function* (input: {
|
||||
@@ -799,12 +813,18 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service |
|
||||
})
|
||||
|
||||
/** Finds the first message matching the predicate, searching newest-first. */
|
||||
const findMessage = Effect.fn("Session.findMessage")(function* (
|
||||
sessionID: SessionID,
|
||||
predicate: (msg: MessageV2.WithParts) => boolean,
|
||||
) {
|
||||
for (const item of MessageV2.stream(sessionID)) {
|
||||
if (predicate(item)) return Option.some(item)
|
||||
const findMessage: Interface["findMessage"] = Effect.fn("Session.findMessage")(function* (sessionID, predicate) {
|
||||
const size = 50
|
||||
let before: string | undefined
|
||||
while (true) {
|
||||
const page = yield* MessageV2.pageEffect({ sessionID, limit: size, before })
|
||||
if (page.items.length === 0) break
|
||||
for (let i = page.items.length - 1; i >= 0; i--) {
|
||||
const item = page.items[i]
|
||||
if (item && predicate(item)) return Option.some(item)
|
||||
}
|
||||
if (!page.more || !page.cursor) break
|
||||
before = page.cursor
|
||||
}
|
||||
return Option.none<MessageV2.WithParts>()
|
||||
})
|
||||
|
||||
@@ -102,7 +102,7 @@ export const layer = Layer.effect(
|
||||
sessionID: SessionID
|
||||
messageID: MessageID
|
||||
}) {
|
||||
const all = yield* sessions.messages({ sessionID: input.sessionID })
|
||||
const all = yield* sessions.messages({ sessionID: input.sessionID }).pipe(Effect.orDie)
|
||||
if (!all.length) return
|
||||
|
||||
const diffs = yield* computeDiff({ messages: all })
|
||||
|
||||
@@ -168,7 +168,8 @@ function withContext<A, E>(
|
||||
)
|
||||
return { info, part }
|
||||
}),
|
||||
messages: (sessionID) => run(modules.Session.Service.use((svc) => svc.messages({ sessionID }))),
|
||||
messages: (sessionID) =>
|
||||
run(modules.Session.Service.use((svc) => svc.messages({ sessionID }).pipe(Effect.orDie))),
|
||||
todos: (sessionID, todos) => run(modules.Todo.Service.use((svc) => svc.update({ sessionID, todos }))),
|
||||
worktree: (input) => run(modules.Worktree.Service.use((svc) => svc.create(input))),
|
||||
worktreeRemove: (directory) =>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { describe, expect, test } from "bun:test"
|
||||
import { Effect } from "effect"
|
||||
import { Effect, Option } from "effect"
|
||||
import { Session as SessionNs } from "@/session/session"
|
||||
import { MessageV2 } from "../../src/session/message-v2"
|
||||
import { MessageID, PartID, type SessionID } from "../../src/session/schema"
|
||||
@@ -582,6 +582,50 @@ describe("MessageV2.get", () => {
|
||||
)
|
||||
})
|
||||
|
||||
describe("Session.messages", () => {
|
||||
it.instance("returns all messages in chronological order across pages", () =>
|
||||
withSession(({ session, sessionID }) =>
|
||||
Effect.gen(function* () {
|
||||
const ids = yield* fill(sessionID, 55)
|
||||
const result = yield* session.messages({ sessionID })
|
||||
expect(result.map((item) => item.info.id)).toEqual(ids)
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
it.instance("fails with NotFoundError for non-existent session", () =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* SessionNs.Service
|
||||
const fake = "non-existent-session" as SessionID
|
||||
const error = yield* Effect.flip(session.messages({ sessionID: fake }))
|
||||
expect(error).toBeInstanceOf(NotFoundError)
|
||||
expect(error.message).toBe(`Session not found: ${fake}`)
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("Session.findMessage", () => {
|
||||
it.instance("searches newest-first", () =>
|
||||
withSession(({ session, sessionID }) =>
|
||||
Effect.gen(function* () {
|
||||
const ids = yield* fill(sessionID, 3)
|
||||
const result = yield* session.findMessage(sessionID, () => true)
|
||||
expect(Option.isSome(result) ? result.value.info.id : undefined).toBe(ids.at(-1))
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
it.instance("fails with NotFoundError for non-existent session", () =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* SessionNs.Service
|
||||
const fake = "non-existent-session" as SessionID
|
||||
const error = yield* Effect.flip(session.findMessage(fake, () => true))
|
||||
expect(error).toBeInstanceOf(NotFoundError)
|
||||
expect(error.message).toBe(`Session not found: ${fake}`)
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("MessageV2.filterCompacted", () => {
|
||||
it.instance("returns all messages when no compaction", () =>
|
||||
withSession(({ sessionID }) =>
|
||||
|
||||
Reference in New Issue
Block a user