mirror of
https://github.com/anomalyco/opencode.git
synced 2026-03-18 12:44:29 +00:00
Compare commits
2 Commits
beta
...
jlongster/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cd81cf9d67 | ||
|
|
4e51c3611a |
@@ -0,0 +1,12 @@
|
||||
CREATE TABLE `event_sequence` (
|
||||
`aggregate_id` text PRIMARY KEY,
|
||||
`seq` integer NOT NULL
|
||||
);
|
||||
--> statement-breakpoint
|
||||
CREATE TABLE `event` (
|
||||
`seq` integer NOT NULL,
|
||||
`aggregateId` text NOT NULL,
|
||||
`name` text NOT NULL,
|
||||
`data` text NOT NULL,
|
||||
CONSTRAINT `fk_event_aggregateId_event_sequence_aggregate_id_fk` FOREIGN KEY (`aggregateId`) REFERENCES `event_sequence`(`aggregate_id`) ON DELETE CASCADE
|
||||
);
|
||||
1306
packages/opencode/migration/20260317175210_events/snapshot.json
Normal file
1306
packages/opencode/migration/20260317175210_events/snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,10 +1,7 @@
|
||||
import z from "zod"
|
||||
import type { ZodType } from "zod"
|
||||
import { Log } from "../util/log"
|
||||
|
||||
export namespace BusEvent {
|
||||
const log = Log.create({ service: "event" })
|
||||
|
||||
export type Definition = ReturnType<typeof define>
|
||||
|
||||
const registry = new Map<string, Definition>()
|
||||
@@ -18,6 +15,10 @@ export namespace BusEvent {
|
||||
return result
|
||||
}
|
||||
|
||||
export function lookup(type: string) {
|
||||
return registry.get(type)
|
||||
}
|
||||
|
||||
export function payloads() {
|
||||
return z
|
||||
.discriminatedUnion(
|
||||
|
||||
@@ -889,10 +889,10 @@ export const GithubRunCommand = cmd({
|
||||
}
|
||||
|
||||
let text = ""
|
||||
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
|
||||
if (evt.properties.part.sessionID !== session.id) return
|
||||
Bus.subscribe(MessageV2.Event.PartUpdated, (evt) => {
|
||||
if (evt.properties.data.part.sessionID !== session.id) return
|
||||
//if (evt.properties.part.messageID === messageID) return
|
||||
const part = evt.properties.part
|
||||
const part = evt.properties.data.part
|
||||
|
||||
if (part.type === "tool" && part.state.status === "completed") {
|
||||
const [tool, color] = TOOL[part.tool] ?? [part.tool, UI.Style.TEXT_INFO_BOLD]
|
||||
|
||||
@@ -90,6 +90,10 @@ export namespace Installation {
|
||||
return CHANNEL === "local"
|
||||
}
|
||||
|
||||
export function isTesting() {
|
||||
return process.env.NODE_ENV === "test"
|
||||
}
|
||||
|
||||
export async function method() {
|
||||
if (process.execPath.includes(path.join(".opencode", "bin"))) return "curl"
|
||||
if (process.execPath.includes(path.join(".local", "bin"))) return "curl"
|
||||
|
||||
@@ -4,6 +4,7 @@ import { streamSSE } from "hono/streaming"
|
||||
import z from "zod"
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { GlobalBus } from "@/bus/global"
|
||||
import { DatabaseEvent } from "@/storage/event"
|
||||
import { Instance } from "../../project/instance"
|
||||
import { Installation } from "@/installation"
|
||||
import { Log } from "../../util/log"
|
||||
|
||||
@@ -276,14 +276,14 @@ export const SessionRoutes = lazy(() =>
|
||||
const sessionID = c.req.valid("param").sessionID
|
||||
const updates = c.req.valid("json")
|
||||
|
||||
let session = await Session.get(sessionID)
|
||||
if (updates.title !== undefined) {
|
||||
session = await Session.setTitle({ sessionID, title: updates.title })
|
||||
await Session.setTitle({ sessionID, title: updates.title })
|
||||
}
|
||||
if (updates.time?.archived !== undefined) {
|
||||
session = await Session.setArchived({ sessionID, time: updates.time.archived })
|
||||
await Session.setArchived({ sessionID, time: updates.time.archived })
|
||||
}
|
||||
|
||||
const session = await Session.get(sessionID)
|
||||
return c.json(session)
|
||||
},
|
||||
)
|
||||
|
||||
@@ -10,9 +10,10 @@ import { Flag } from "../flag/flag"
|
||||
import { Identifier } from "../id/id"
|
||||
import { Installation } from "../installation"
|
||||
|
||||
import { Database, NotFoundError, eq, and, or, gte, isNull, desc, like, inArray, lt } from "../storage/db"
|
||||
import { Database, NotFoundError, eq, and, gte, isNull, desc, like, inArray, lt } from "../storage/db"
|
||||
import { DatabaseEvent } from "../storage/event"
|
||||
import type { SQL } from "../storage/db"
|
||||
import { SessionTable, MessageTable, PartTable } from "./session.sql"
|
||||
import { SessionTable } from "./session.sql"
|
||||
import { ProjectTable } from "../project/project.sql"
|
||||
import { Storage } from "@/storage/storage"
|
||||
import { Log } from "../util/log"
|
||||
@@ -179,21 +180,38 @@ export namespace Session {
|
||||
export type GlobalInfo = z.output<typeof GlobalInfo>
|
||||
|
||||
export const Event = {
|
||||
Created: BusEvent.define(
|
||||
Created: DatabaseEvent.define(
|
||||
"session.created",
|
||||
"v1",
|
||||
z.object({
|
||||
id: z.string(),
|
||||
info: Info,
|
||||
}),
|
||||
),
|
||||
Updated: BusEvent.define(
|
||||
Shared: DatabaseEvent.define(
|
||||
"session.shared",
|
||||
"v1",
|
||||
z.object({
|
||||
id: z.string(),
|
||||
url: z.string().optional(),
|
||||
}),
|
||||
),
|
||||
Touch: DatabaseEvent.define("session.touch", "v1", z.object({ id: z.string(), time: z.number() })),
|
||||
Updated: DatabaseEvent.define(
|
||||
"session.updated",
|
||||
"v1",
|
||||
z.object({
|
||||
info: Info,
|
||||
id: z.string(),
|
||||
info: Info.partial().extend({
|
||||
time: Info.shape.time.partial().optional(),
|
||||
}),
|
||||
}),
|
||||
),
|
||||
Deleted: BusEvent.define(
|
||||
Deleted: DatabaseEvent.define(
|
||||
"session.deleted",
|
||||
"v1",
|
||||
z.object({
|
||||
id: z.string(),
|
||||
info: Info,
|
||||
}),
|
||||
),
|
||||
@@ -277,18 +295,8 @@ export namespace Session {
|
||||
)
|
||||
|
||||
export const touch = fn(Identifier.schema("session"), async (sessionID) => {
|
||||
const now = Date.now()
|
||||
Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({ time_updated: now })
|
||||
.where(eq(SessionTable.id, sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
})
|
||||
const time = Date.now()
|
||||
DatabaseEvent.run(Event.Touch, { id: sessionID, time })
|
||||
})
|
||||
|
||||
export async function createNext(input: {
|
||||
@@ -315,22 +323,18 @@ export namespace Session {
|
||||
},
|
||||
}
|
||||
log.info("created", result)
|
||||
Database.use((db) => {
|
||||
db.insert(SessionTable).values(toRow(result)).run()
|
||||
Database.effect(() =>
|
||||
Bus.publish(Event.Created, {
|
||||
info: result,
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
DatabaseEvent.run(Event.Created, { id: result.id, info: result })
|
||||
|
||||
const cfg = await Config.get()
|
||||
if (!result.parentID && (Flag.OPENCODE_AUTO_SHARE || cfg.share === "auto"))
|
||||
share(result.id).catch(() => {
|
||||
// Silently ignore sharing errors during session creation
|
||||
})
|
||||
Bus.publish(Event.Updated, {
|
||||
info: result,
|
||||
})
|
||||
// Bus.publish(Event.Updated, {
|
||||
// id: result.id,
|
||||
// info: result,
|
||||
// })
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -354,12 +358,9 @@ export namespace Session {
|
||||
}
|
||||
const { ShareNext } = await import("@/share/share-next")
|
||||
const share = await ShareNext.create(id)
|
||||
Database.use((db) => {
|
||||
const row = db.update(SessionTable).set({ share_url: share.url }).where(eq(SessionTable.id, id)).returning().get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${id}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
})
|
||||
|
||||
DatabaseEvent.run(Event.Shared, { id, url: share.url })
|
||||
|
||||
return share
|
||||
})
|
||||
|
||||
@@ -367,12 +368,8 @@ export namespace Session {
|
||||
// Use ShareNext to remove the share (same as share function uses ShareNext to create)
|
||||
const { ShareNext } = await import("@/share/share-next")
|
||||
await ShareNext.remove(id)
|
||||
Database.use((db) => {
|
||||
const row = db.update(SessionTable).set({ share_url: null }).where(eq(SessionTable.id, id)).returning().get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${id}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
})
|
||||
|
||||
DatabaseEvent.run(Event.Shared, { id, url: undefined })
|
||||
})
|
||||
|
||||
export const setTitle = fn(
|
||||
@@ -381,18 +378,7 @@ export namespace Session {
|
||||
title: z.string(),
|
||||
}),
|
||||
async (input) => {
|
||||
return Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({ title: input.title })
|
||||
.where(eq(SessionTable.id, input.sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
return info
|
||||
})
|
||||
DatabaseEvent.run(Event.Updated, { id: input.sessionID, info: { title: input.title } })
|
||||
},
|
||||
)
|
||||
|
||||
@@ -402,18 +388,7 @@ export namespace Session {
|
||||
time: z.number().optional(),
|
||||
}),
|
||||
async (input) => {
|
||||
return Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({ time_archived: input.time })
|
||||
.where(eq(SessionTable.id, input.sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
return info
|
||||
})
|
||||
DatabaseEvent.run(Event.Updated, { id: input.sessionID, info: { time: { archived: input.time } } })
|
||||
},
|
||||
)
|
||||
|
||||
@@ -423,17 +398,9 @@ export namespace Session {
|
||||
permission: PermissionNext.Ruleset,
|
||||
}),
|
||||
async (input) => {
|
||||
return Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({ permission: input.permission, time_updated: Date.now() })
|
||||
.where(eq(SessionTable.id, input.sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
return info
|
||||
DatabaseEvent.run(Event.Updated, {
|
||||
id: input.sessionID,
|
||||
info: { permission: input.permission, time: { updated: Date.now() } },
|
||||
})
|
||||
},
|
||||
)
|
||||
@@ -445,42 +412,24 @@ export namespace Session {
|
||||
summary: Info.shape.summary,
|
||||
}),
|
||||
async (input) => {
|
||||
return Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({
|
||||
revert: input.revert ?? null,
|
||||
summary_additions: input.summary?.additions,
|
||||
summary_deletions: input.summary?.deletions,
|
||||
summary_files: input.summary?.files,
|
||||
time_updated: Date.now(),
|
||||
})
|
||||
.where(eq(SessionTable.id, input.sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
return info
|
||||
DatabaseEvent.run(Event.Updated, {
|
||||
id: input.sessionID,
|
||||
info: {
|
||||
summary: input.summary,
|
||||
time: { updated: Date.now() },
|
||||
revert: input.revert,
|
||||
},
|
||||
})
|
||||
},
|
||||
)
|
||||
|
||||
export const clearRevert = fn(Identifier.schema("session"), async (sessionID) => {
|
||||
return Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({
|
||||
revert: null,
|
||||
time_updated: Date.now(),
|
||||
})
|
||||
.where(eq(SessionTable.id, sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
return info
|
||||
DatabaseEvent.run(Event.Updated, {
|
||||
id: sessionID,
|
||||
info: {
|
||||
time: { updated: Date.now() },
|
||||
revert: undefined,
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
@@ -490,22 +439,12 @@ export namespace Session {
|
||||
summary: Info.shape.summary,
|
||||
}),
|
||||
async (input) => {
|
||||
return Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({
|
||||
summary_additions: input.summary?.additions,
|
||||
summary_deletions: input.summary?.deletions,
|
||||
summary_files: input.summary?.files,
|
||||
time_updated: Date.now(),
|
||||
})
|
||||
.where(eq(SessionTable.id, input.sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
return info
|
||||
DatabaseEvent.run(Event.Updated, {
|
||||
id: input.sessionID,
|
||||
info: {
|
||||
time: { updated: Date.now() },
|
||||
summary: input.summary,
|
||||
},
|
||||
})
|
||||
},
|
||||
)
|
||||
@@ -659,46 +598,25 @@ export namespace Session {
|
||||
})
|
||||
|
||||
export const remove = fn(Identifier.schema("session"), async (sessionID) => {
|
||||
const project = Instance.project
|
||||
try {
|
||||
const session = await get(sessionID)
|
||||
for (const child of await children(sessionID)) {
|
||||
await remove(child.id)
|
||||
}
|
||||
await unshare(sessionID).catch(() => {})
|
||||
// CASCADE delete handles messages and parts automatically
|
||||
Database.use((db) => {
|
||||
db.delete(SessionTable).where(eq(SessionTable.id, sessionID)).run()
|
||||
Database.effect(() =>
|
||||
Bus.publish(Event.Deleted, {
|
||||
info: session,
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
DatabaseEvent.run(Event.Deleted, { id: sessionID, info: session })
|
||||
} catch (e) {
|
||||
log.error(e)
|
||||
}
|
||||
})
|
||||
|
||||
export const updateMessage = fn(MessageV2.Info, async (msg) => {
|
||||
const time_created = msg.time.created
|
||||
const { id, sessionID, ...data } = msg
|
||||
Database.use((db) => {
|
||||
db.insert(MessageTable)
|
||||
.values({
|
||||
id,
|
||||
session_id: sessionID,
|
||||
time_created,
|
||||
data,
|
||||
})
|
||||
.onConflictDoUpdate({ target: MessageTable.id, set: { data } })
|
||||
.run()
|
||||
Database.effect(() =>
|
||||
Bus.publish(MessageV2.Event.Updated, {
|
||||
info: msg,
|
||||
}),
|
||||
)
|
||||
DatabaseEvent.run(MessageV2.Event.Updated, {
|
||||
id: msg.sessionID,
|
||||
info: msg,
|
||||
})
|
||||
|
||||
return msg
|
||||
})
|
||||
|
||||
@@ -708,17 +626,9 @@ export namespace Session {
|
||||
messageID: Identifier.schema("message"),
|
||||
}),
|
||||
async (input) => {
|
||||
// CASCADE delete handles parts automatically
|
||||
Database.use((db) => {
|
||||
db.delete(MessageTable)
|
||||
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
|
||||
.run()
|
||||
Database.effect(() =>
|
||||
Bus.publish(MessageV2.Event.Removed, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
}),
|
||||
)
|
||||
DatabaseEvent.run(MessageV2.Event.Removed, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
})
|
||||
return input.messageID
|
||||
},
|
||||
@@ -731,17 +641,10 @@ export namespace Session {
|
||||
partID: Identifier.schema("part"),
|
||||
}),
|
||||
async (input) => {
|
||||
Database.use((db) => {
|
||||
db.delete(PartTable)
|
||||
.where(and(eq(PartTable.id, input.partID), eq(PartTable.session_id, input.sessionID)))
|
||||
.run()
|
||||
Database.effect(() =>
|
||||
Bus.publish(MessageV2.Event.PartRemoved, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
partID: input.partID,
|
||||
}),
|
||||
)
|
||||
DatabaseEvent.run(MessageV2.Event.PartRemoved, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
partID: input.partID,
|
||||
})
|
||||
return input.partID
|
||||
},
|
||||
@@ -750,24 +653,10 @@ export namespace Session {
|
||||
const UpdatePartInput = MessageV2.Part
|
||||
|
||||
export const updatePart = fn(UpdatePartInput, async (part) => {
|
||||
const { id, messageID, sessionID, ...data } = part
|
||||
const time = Date.now()
|
||||
Database.use((db) => {
|
||||
db.insert(PartTable)
|
||||
.values({
|
||||
id,
|
||||
message_id: messageID,
|
||||
session_id: sessionID,
|
||||
time_created: time,
|
||||
data,
|
||||
})
|
||||
.onConflictDoUpdate({ target: PartTable.id, set: { data } })
|
||||
.run()
|
||||
Database.effect(() =>
|
||||
Bus.publish(MessageV2.Event.PartUpdated, {
|
||||
part: structuredClone(part),
|
||||
}),
|
||||
)
|
||||
DatabaseEvent.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID: part.sessionID,
|
||||
part: structuredClone(part),
|
||||
time: Date.now(),
|
||||
})
|
||||
return part
|
||||
})
|
||||
|
||||
@@ -7,6 +7,7 @@ import { LSP } from "../lsp"
|
||||
import { Snapshot } from "@/snapshot"
|
||||
import { fn } from "@/util/fn"
|
||||
import { Database, eq, desc, inArray } from "@/storage/db"
|
||||
import { DatabaseEvent } from "../storage/event"
|
||||
import { MessageTable, PartTable } from "./session.sql"
|
||||
import { ProviderTransform } from "@/provider/transform"
|
||||
import { STATUS_CODES } from "http"
|
||||
@@ -448,23 +449,29 @@ export namespace MessageV2 {
|
||||
export type Info = z.infer<typeof Info>
|
||||
|
||||
export const Event = {
|
||||
Updated: BusEvent.define(
|
||||
Updated: DatabaseEvent.define(
|
||||
"message.updated",
|
||||
"v1",
|
||||
z.object({
|
||||
id: z.string(),
|
||||
info: Info,
|
||||
}),
|
||||
),
|
||||
Removed: BusEvent.define(
|
||||
Removed: DatabaseEvent.agg("sessionID").define(
|
||||
"message.removed",
|
||||
"v1",
|
||||
z.object({
|
||||
sessionID: z.string(),
|
||||
messageID: z.string(),
|
||||
}),
|
||||
),
|
||||
PartUpdated: BusEvent.define(
|
||||
PartUpdated: DatabaseEvent.agg("sessionID").define(
|
||||
"message.part.updated",
|
||||
"v1",
|
||||
z.object({
|
||||
sessionID: z.string(),
|
||||
part: Part,
|
||||
time: z.number(),
|
||||
}),
|
||||
),
|
||||
PartDelta: BusEvent.define(
|
||||
@@ -477,8 +484,9 @@ export namespace MessageV2 {
|
||||
delta: z.string(),
|
||||
}),
|
||||
),
|
||||
PartRemoved: BusEvent.define(
|
||||
PartRemoved: DatabaseEvent.agg("sessionID").define(
|
||||
"message.part.removed",
|
||||
"v1",
|
||||
z.object({
|
||||
sessionID: z.string(),
|
||||
messageID: z.string(),
|
||||
|
||||
142
packages/opencode/src/session/projectors.ts
Normal file
142
packages/opencode/src/session/projectors.ts
Normal file
@@ -0,0 +1,142 @@
|
||||
import { Bus } from "@/bus"
|
||||
import { NotFoundError, eq, and } from "../storage/db"
|
||||
import { DatabaseEvent } from "@/storage/event"
|
||||
import { Session } from "./index"
|
||||
import { MessageV2 } from "./message-v2"
|
||||
import { SessionTable, MessageTable, PartTable } from "./session.sql"
|
||||
import { ProjectTable } from "../project/project.sql"
|
||||
|
||||
type DeepPartial<T> = {
|
||||
[K in keyof T]?: T[K] extends object ? DeepPartial<T[K]> : T[K]
|
||||
}
|
||||
|
||||
function grab<T extends object, K1 extends keyof T, X>(
|
||||
obj: T,
|
||||
field1: K1,
|
||||
cb?: (val: NonNullable<T[K1]>) => X,
|
||||
): X | undefined {
|
||||
if (obj == undefined || !(field1 in obj)) return undefined
|
||||
|
||||
const val = obj[field1]
|
||||
if (val && typeof val === "object" && cb) {
|
||||
return cb(val)
|
||||
}
|
||||
return (val === undefined ? null : val) as X | undefined
|
||||
}
|
||||
|
||||
export function toPartialRow(info: DeepPartial<Session.Info>) {
|
||||
const obj = {
|
||||
id: grab(info, "id"),
|
||||
project_id: grab(info, "projectID"),
|
||||
workspace_id: grab(info, "workspaceID"),
|
||||
parent_id: grab(info, "parentID"),
|
||||
slug: grab(info, "slug"),
|
||||
directory: grab(info, "directory"),
|
||||
title: grab(info, "title"),
|
||||
version: grab(info, "version"),
|
||||
share_url: grab(info, "share", (v) => grab(v, "url")),
|
||||
summary_additions: grab(info, "summary", (v) => grab(v, "additions")),
|
||||
summary_deletions: grab(info, "summary", (v) => grab(v, "deletions")),
|
||||
summary_files: grab(info, "summary", (v) => grab(v, "files")),
|
||||
summary_diffs: grab(info, "summary", (v) => grab(v, "diffs")),
|
||||
revert: grab(info, "revert"),
|
||||
permission: grab(info, "permission"),
|
||||
time_created: grab(info, "time", (v) => grab(v, "created")),
|
||||
time_updated: grab(info, "time", (v) => grab(v, "updated")),
|
||||
time_compacting: grab(info, "time", (v) => grab(v, "compacting")),
|
||||
time_archived: grab(info, "time", (v) => grab(v, "archived")),
|
||||
}
|
||||
|
||||
return Object.fromEntries(Object.entries(obj).filter(([_, val]) => val !== undefined))
|
||||
}
|
||||
|
||||
DatabaseEvent.addProjector(Session.Event.Created, (db, data) => {
|
||||
const existing = db
|
||||
.select({ id: ProjectTable.id })
|
||||
.from(ProjectTable)
|
||||
.where(eq(ProjectTable.id, data.info.projectID))
|
||||
.get()
|
||||
if (!existing) {
|
||||
// Create a (temporary) project to make this work. In the future
|
||||
// we should separate sessions and projects
|
||||
db.insert(ProjectTable)
|
||||
.values({
|
||||
id: data.info.projectID,
|
||||
worktree: data.info.directory,
|
||||
sandboxes: [],
|
||||
})
|
||||
.run()
|
||||
}
|
||||
|
||||
db.insert(SessionTable).values(Session.toRow(data.info)).run()
|
||||
})
|
||||
|
||||
DatabaseEvent.addProjector(Session.Event.Updated, (db, data) => {
|
||||
const info = data.info
|
||||
const row = db.update(SessionTable).set(toPartialRow(info)).where(eq(SessionTable.id, data.id)).returning().get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${data.id}` })
|
||||
})
|
||||
|
||||
DatabaseEvent.addProjector(Session.Event.Shared, (db, data) => {
|
||||
const row = db.update(SessionTable).set({ share_url: data.url }).where(eq(SessionTable.id, data.id)).returning().get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${data.id}` })
|
||||
})
|
||||
|
||||
DatabaseEvent.addProjector(Session.Event.Touch, (db, data) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({ time_updated: data.time })
|
||||
.where(eq(SessionTable.id, data.id))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${data.id}` })
|
||||
|
||||
// const info = Session.fromRow(row)
|
||||
// Database.effect(() => Bus.publish(Event.Updated, { id: data.id, info }))
|
||||
})
|
||||
|
||||
DatabaseEvent.addProjector(Session.Event.Deleted, (db, data) => {
|
||||
db.delete(SessionTable).where(eq(SessionTable.id, data.id)).run()
|
||||
})
|
||||
|
||||
DatabaseEvent.addProjector(MessageV2.Event.Updated, (db, data) => {
|
||||
const time_created = data.info.time.created
|
||||
const { id, sessionID, ...rest } = data.info
|
||||
|
||||
db.insert(MessageTable)
|
||||
.values({
|
||||
id,
|
||||
session_id: sessionID,
|
||||
time_created,
|
||||
data: rest,
|
||||
})
|
||||
.onConflictDoUpdate({ target: MessageTable.id, set: { data: rest } })
|
||||
.run()
|
||||
})
|
||||
|
||||
DatabaseEvent.addProjector(MessageV2.Event.Removed, (db, data) => {
|
||||
db.delete(MessageTable)
|
||||
.where(and(eq(MessageTable.id, data.messageID), eq(MessageTable.session_id, data.sessionID)))
|
||||
.run()
|
||||
})
|
||||
|
||||
DatabaseEvent.addProjector(MessageV2.Event.PartRemoved, (db, data) => {
|
||||
db.delete(PartTable)
|
||||
.where(and(eq(PartTable.id, data.partID), eq(PartTable.session_id, data.sessionID)))
|
||||
.run()
|
||||
})
|
||||
|
||||
DatabaseEvent.addProjector(MessageV2.Event.PartUpdated, (db, data) => {
|
||||
const { id, messageID, sessionID, ...rest } = data.part
|
||||
|
||||
db.insert(PartTable)
|
||||
.values({
|
||||
id,
|
||||
message_id: messageID,
|
||||
session_id: sessionID,
|
||||
time_created: data.time,
|
||||
data: rest,
|
||||
})
|
||||
.onConflictDoUpdate({ target: PartTable.id, set: { data: rest } })
|
||||
.run()
|
||||
})
|
||||
@@ -5,7 +5,8 @@ import { MessageV2 } from "./message-v2"
|
||||
import { Session } from "."
|
||||
import { Log } from "../util/log"
|
||||
import { Database, eq } from "../storage/db"
|
||||
import { MessageTable, PartTable } from "./session.sql"
|
||||
import { DatabaseEvent } from "../storage/event"
|
||||
import { PartTable } from "./session.sql"
|
||||
import { Storage } from "@/storage/storage"
|
||||
import { Bus } from "../bus"
|
||||
import { SessionPrompt } from "./prompt"
|
||||
@@ -113,8 +114,10 @@ export namespace SessionRevert {
|
||||
remove.push(msg)
|
||||
}
|
||||
for (const msg of remove) {
|
||||
Database.use((db) => db.delete(MessageTable).where(eq(MessageTable.id, msg.info.id)).run())
|
||||
await Bus.publish(MessageV2.Event.Removed, { sessionID: sessionID, messageID: msg.info.id })
|
||||
DatabaseEvent.run(MessageV2.Event.Removed, {
|
||||
sessionID: sessionID,
|
||||
messageID: msg.info.id,
|
||||
})
|
||||
}
|
||||
if (session.revert.partID && target) {
|
||||
const partID = session.revert.partID
|
||||
@@ -124,8 +127,7 @@ export namespace SessionRevert {
|
||||
const removeParts = target.parts.slice(removeStart)
|
||||
target.parts = preserveParts
|
||||
for (const part of removeParts) {
|
||||
Database.use((db) => db.delete(PartTable).where(eq(PartTable.id, part.id)).run())
|
||||
await Bus.publish(MessageV2.Event.PartRemoved, {
|
||||
DatabaseEvent.run(MessageV2.Event.PartRemoved, {
|
||||
sessionID: sessionID,
|
||||
messageID: target.info.id,
|
||||
partID: part.id,
|
||||
|
||||
@@ -64,38 +64,37 @@ export namespace ShareNext {
|
||||
export async function init() {
|
||||
if (disabled) return
|
||||
Bus.subscribe(Session.Event.Updated, async (evt) => {
|
||||
await sync(evt.properties.info.id, [
|
||||
const session = await Session.get(evt.properties.data.id)
|
||||
|
||||
await sync(session.id, [
|
||||
{
|
||||
type: "session",
|
||||
data: evt.properties.info,
|
||||
data: session,
|
||||
},
|
||||
])
|
||||
})
|
||||
Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
|
||||
await sync(evt.properties.info.sessionID, [
|
||||
const info = evt.properties.data.info
|
||||
await sync(info.sessionID, [
|
||||
{
|
||||
type: "message",
|
||||
data: evt.properties.info,
|
||||
data: evt.properties.data.info,
|
||||
},
|
||||
])
|
||||
if (evt.properties.info.role === "user") {
|
||||
await sync(evt.properties.info.sessionID, [
|
||||
if (info.role === "user") {
|
||||
await sync(info.sessionID, [
|
||||
{
|
||||
type: "model",
|
||||
data: [
|
||||
await Provider.getModel(evt.properties.info.model.providerID, evt.properties.info.model.modelID).then(
|
||||
(m) => m,
|
||||
),
|
||||
],
|
||||
data: [await Provider.getModel(info.model.providerID, info.model.modelID).then((m) => m)],
|
||||
},
|
||||
])
|
||||
}
|
||||
})
|
||||
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
|
||||
await sync(evt.properties.part.sessionID, [
|
||||
await sync(evt.properties.data.part.sessionID, [
|
||||
{
|
||||
type: "part",
|
||||
data: evt.properties.part,
|
||||
data: evt.properties.data.part,
|
||||
},
|
||||
])
|
||||
})
|
||||
|
||||
@@ -28,12 +28,19 @@ export const NotFoundError = NamedError.create(
|
||||
const log = Log.create({ service: "db" })
|
||||
|
||||
export namespace Database {
|
||||
export const Path = iife(() => {
|
||||
export function getChannelPath() {
|
||||
const channel = Installation.CHANNEL
|
||||
if (["latest", "beta"].includes(channel) || Flag.OPENCODE_DISABLE_CHANNEL_DB)
|
||||
return path.join(Global.Path.data, "opencode.db")
|
||||
const safe = channel.replace(/[^a-zA-Z0-9._-]/g, "-")
|
||||
return path.join(Global.Path.data, `opencode-${safe}.db`)
|
||||
}
|
||||
|
||||
export const Path = iife(() => {
|
||||
if (Installation.isTesting()) {
|
||||
return ":memory:"
|
||||
}
|
||||
return getChannelPath()
|
||||
})
|
||||
|
||||
type Schema = typeof schema
|
||||
@@ -153,19 +160,37 @@ export namespace Database {
|
||||
}
|
||||
}
|
||||
|
||||
export function transaction<T>(callback: (tx: TxOrDb) => T): T {
|
||||
type NotPromise<T> = T extends Promise<any> ? never : T
|
||||
|
||||
export function _transaction<T>(
|
||||
callback: (tx: TxOrDb) => NotPromise<T>,
|
||||
options?: {
|
||||
behavior?: "deferred" | "immediate" | "exclusive"
|
||||
},
|
||||
): NotPromise<T> {
|
||||
try {
|
||||
return callback(ctx.use().tx)
|
||||
} catch (err) {
|
||||
if (err instanceof Context.NotFound) {
|
||||
const effects: (() => void | Promise<void>)[] = []
|
||||
const result = (Client().transaction as any)((tx: TxOrDb) => {
|
||||
return ctx.provide({ tx, effects }, () => callback(tx))
|
||||
})
|
||||
const result = Client().transaction(
|
||||
(tx: TxOrDb) => {
|
||||
return ctx.provide({ tx, effects }, () => callback(tx))
|
||||
},
|
||||
{ behavior: options?.behavior },
|
||||
)
|
||||
for (const effect of effects) effect()
|
||||
return result
|
||||
return result as NotPromise<T>
|
||||
}
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
export function transaction<T>(callback: (tx: TxOrDb) => NotPromise<T>) {
|
||||
return _transaction(callback)
|
||||
}
|
||||
|
||||
export function immediateTransaction<T>(callback: (tx: TxOrDb) => NotPromise<T>) {
|
||||
return _transaction(callback, { behavior: "immediate" })
|
||||
}
|
||||
}
|
||||
|
||||
15
packages/opencode/src/storage/event.sql.ts
Normal file
15
packages/opencode/src/storage/event.sql.ts
Normal file
@@ -0,0 +1,15 @@
|
||||
import { sqliteTable, text, integer } from "drizzle-orm/sqlite-core"
|
||||
|
||||
export const EventSequenceTable = sqliteTable("event_sequence", {
|
||||
aggregate_id: text().notNull().primaryKey(),
|
||||
seq: integer().notNull(),
|
||||
})
|
||||
|
||||
export const EventTable = sqliteTable("event", {
|
||||
seq: integer().notNull(),
|
||||
aggregateId: text()
|
||||
.notNull()
|
||||
.references(() => EventSequenceTable.aggregate_id, { onDelete: "cascade" }),
|
||||
name: text().notNull(),
|
||||
data: text({ mode: "json" }).$type<Record<string, unknown>>().notNull(),
|
||||
})
|
||||
151
packages/opencode/src/storage/event.ts
Normal file
151
packages/opencode/src/storage/event.ts
Normal file
@@ -0,0 +1,151 @@
|
||||
import z from "zod"
|
||||
import type { ZodObject } from "zod"
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { Bus } from "@/bus"
|
||||
import { Database, eq, max } from "./db"
|
||||
import { EventSequenceTable, EventTable } from "./event.sql"
|
||||
|
||||
export namespace DatabaseEvent {
|
||||
export type Definition = {
|
||||
type: string
|
||||
properties: ZodObject<{ seq: z.ZodNumber; aggregateId: z.ZodString; data: z.ZodObject }>
|
||||
version: string
|
||||
aggregateField: string
|
||||
}
|
||||
|
||||
export type Event<Def extends Definition = Definition> = {
|
||||
seq: number
|
||||
aggregateId: string
|
||||
data: z.infer<Def["properties"]>["data"]
|
||||
}
|
||||
|
||||
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
|
||||
|
||||
const projectors = new Map<Definition, (db: Database.TxOrDb, data: unknown) => void>()
|
||||
const registry = new Map<string, Definition>()
|
||||
|
||||
export function versionedName(type: string, version?: string) {
|
||||
return version ? `${type}.${version}` : type
|
||||
}
|
||||
|
||||
export function define<Type extends string, Properties extends ZodObject<{ id: z.ZodString }>>(
|
||||
type: Type,
|
||||
version: string,
|
||||
properties: Properties,
|
||||
) {
|
||||
return agg("id").define(type, version, properties)
|
||||
}
|
||||
|
||||
export function agg<F extends string>(aggregateField: F) {
|
||||
return {
|
||||
define<Type extends string, Data extends ZodObject<Record<F, z.ZodString>>>(
|
||||
type: Type,
|
||||
version: string,
|
||||
data: Data,
|
||||
) {
|
||||
const def = {
|
||||
type,
|
||||
properties: z.object({ seq: z.number(), aggregateId: z.string(), data }),
|
||||
version,
|
||||
aggregateField,
|
||||
}
|
||||
|
||||
registry.set(versionedName(def.type, def.version), def)
|
||||
BusEvent.define(versionedName(def.type, def.version), def.properties)
|
||||
|
||||
return def
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
export function addProjector<Def extends Definition>(
|
||||
def: Def,
|
||||
func: (db: Database.TxOrDb, data: Event<Def>["data"]) => void,
|
||||
) {
|
||||
projectors.set(def, func as (db: Database.TxOrDb, data: unknown) => void)
|
||||
}
|
||||
|
||||
function process<Def extends Definition>(def: Def, input: Event<Def>) {
|
||||
const projector = projectors.get(def)
|
||||
if (!projector) {
|
||||
throw new Error(`Projector not found for event: ${def.type}`)
|
||||
}
|
||||
|
||||
// idempotent: need to ignore any events already logged
|
||||
|
||||
Database.transaction((tx) => {
|
||||
projector(tx, input.data)
|
||||
tx.insert(EventSequenceTable)
|
||||
.values({
|
||||
aggregate_id: input.aggregateId,
|
||||
seq: input.seq,
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: EventSequenceTable.aggregate_id,
|
||||
set: { seq: input.seq },
|
||||
})
|
||||
.run()
|
||||
tx.insert(EventTable)
|
||||
.values({
|
||||
seq: input.seq,
|
||||
aggregateId: input.aggregateId,
|
||||
name: versionedName(def.type, def.version),
|
||||
data: input.data as Record<string, unknown>,
|
||||
})
|
||||
.run()
|
||||
})
|
||||
}
|
||||
|
||||
// TODO:
|
||||
//
|
||||
// * Support applying multiple events at one time. One transaction,
|
||||
// and it validets all the sequence ids
|
||||
// * when loading events from db, apply zod validation to ensure shape
|
||||
|
||||
export function replay(event: SerializedEvent) {
|
||||
const def = registry.get(event.type)
|
||||
if (!def) {
|
||||
console.log(registry)
|
||||
throw new Error(`Unknown event type: ${event.type}`)
|
||||
}
|
||||
|
||||
const maxSeq = Database.use((db) =>
|
||||
db
|
||||
.select({ val: max(EventTable.seq) })
|
||||
.from(EventTable)
|
||||
.where(eq(EventTable.aggregateId, event.aggregateId))
|
||||
.get(),
|
||||
)
|
||||
|
||||
const expected = maxSeq?.val ? maxSeq.val + 1 : 0
|
||||
if (event.seq !== expected) {
|
||||
throw new Error(`Sequence mismatch for aggregate "${event.aggregateId}": expected ${expected}, got ${event.seq}`)
|
||||
}
|
||||
|
||||
process(def, event)
|
||||
}
|
||||
|
||||
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"]) {
|
||||
const agg = (data as Record<string, string>)[def.aggregateField]
|
||||
// This should never happen: we've enforced it via typescript in
|
||||
// the definition
|
||||
if (agg == null) {
|
||||
throw new Error(`DatabaseEvent: "${def.aggregateField}" required but not found: ${JSON.stringify(event)}`)
|
||||
}
|
||||
|
||||
Database.immediateTransaction((tx) => {
|
||||
const row = tx
|
||||
.select({ seq: EventSequenceTable.seq })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, agg))
|
||||
.get()
|
||||
const seq = (row?.seq ?? 0) + 1
|
||||
process(def, { seq, aggregateId: agg, data })
|
||||
|
||||
Database.effect(() => {
|
||||
const versionedDef = { ...def, type: versionedName(def.type, def.version) }
|
||||
Bus.publish(versionedDef, { seq, aggregateId: agg, data } as z.output<Def["properties"]>)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
86
packages/opencode/src/test-projection.ts
Normal file
86
packages/opencode/src/test-projection.ts
Normal file
@@ -0,0 +1,86 @@
|
||||
import os from "os"
|
||||
import path from "path"
|
||||
import fs from "fs/promises"
|
||||
|
||||
// Set XDG env vars BEFORE any src/ imports to isolate from real data
|
||||
const dir = path.join(os.tmpdir(), "opencode-test-projection-" + process.pid)
|
||||
await fs.mkdir(dir, { recursive: true })
|
||||
process.env["XDG_DATA_HOME"] = path.join(dir, "share")
|
||||
process.env["XDG_CACHE_HOME"] = path.join(dir, "cache")
|
||||
process.env["XDG_CONFIG_HOME"] = path.join(dir, "config")
|
||||
process.env["XDG_STATE_HOME"] = path.join(dir, "state")
|
||||
|
||||
// Write the cache version file
|
||||
const cache = path.join(dir, "cache", "opencode")
|
||||
await fs.mkdir(cache, { recursive: true })
|
||||
await fs.writeFile(path.join(cache, "version"), "14")
|
||||
|
||||
// Now safe to import src/
|
||||
const { Log } = await import("@/util/log")
|
||||
Log.init({ print: true, dev: true, level: "DEBUG" })
|
||||
|
||||
const { Instance } = await import("@/project/instance")
|
||||
const { Database } = await import("@/storage/db")
|
||||
const { GlobalBus } = await import("@/bus/global")
|
||||
const { Bus } = await import("@/bus")
|
||||
const { Session } = await import("@/session")
|
||||
const { Server } = await import("@/server/server")
|
||||
const { SessionPrompt } = await import("@/session/prompt")
|
||||
|
||||
// register projectors
|
||||
await import("@/session/projectors")
|
||||
|
||||
async function wait(ms: number) {
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(resolve, ms)
|
||||
})
|
||||
}
|
||||
|
||||
async function run() {
|
||||
console.log("project id:", Instance.project.id)
|
||||
|
||||
// start the server
|
||||
const server = Server.listen({
|
||||
port: 0,
|
||||
hostname: "127.0.0.1",
|
||||
})
|
||||
console.log("server listening on:", server.url.toString())
|
||||
console.log("SSE endpoint:", `${server.url}event`)
|
||||
|
||||
const base = server.url.toString().replace(/\/$/, "")
|
||||
console.log("\nServer running. Try:")
|
||||
console.log(` curl -N ${base}/event`)
|
||||
console.log("\nPress Ctrl+C to stop.\n")
|
||||
|
||||
while (1) {
|
||||
await wait(5000)
|
||||
|
||||
const session = await Session.create({
|
||||
title: "test session",
|
||||
})
|
||||
console.log("created session:", session.id, session.title)
|
||||
|
||||
// send messages to the session
|
||||
async function prompt(text: string) {
|
||||
console.log(`\n--- sending: "${text}" ---`)
|
||||
await SessionPrompt.prompt({
|
||||
sessionID: session.id,
|
||||
parts: [{ type: "text", text }],
|
||||
})
|
||||
console.log(`--- done: "${text}" ---`)
|
||||
}
|
||||
|
||||
await prompt("What is 2 + 2?")
|
||||
await wait(2500)
|
||||
await prompt("Now multiply that by 10")
|
||||
await wait(2500)
|
||||
await prompt("Summarize what we've discussed")
|
||||
}
|
||||
|
||||
await new Promise(() => {})
|
||||
}
|
||||
|
||||
await Instance.provide({
|
||||
directory: "~/tmp/project-test7",
|
||||
fn: run,
|
||||
})
|
||||
72
packages/opencode/src/test-replicate.ts
Normal file
72
packages/opencode/src/test-replicate.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
import os from "os"
|
||||
import path from "path"
|
||||
import fs from "fs/promises"
|
||||
|
||||
// Set XDG env vars BEFORE any src/ imports to isolate from real data
|
||||
const dir = path.join("/Users/james/tmp/opencode-test-replicate")
|
||||
await fs.mkdir(dir, { recursive: true })
|
||||
process.env["XDG_DATA_HOME"] = path.join(dir, "share")
|
||||
process.env["XDG_CACHE_HOME"] = path.join(dir, "cache")
|
||||
process.env["XDG_CONFIG_HOME"] = path.join(dir, "config")
|
||||
process.env["XDG_STATE_HOME"] = path.join(dir, "state")
|
||||
|
||||
// Write the cache version file
|
||||
const cache = path.join(dir, "cache", "opencode")
|
||||
await fs.mkdir(cache, { recursive: true })
|
||||
await fs.writeFile(path.join(cache, "version"), "14")
|
||||
|
||||
// Now safe to import src/
|
||||
const { Log } = await import("@/util/log")
|
||||
Log.init({ print: true, dev: true, level: "DEBUG" })
|
||||
|
||||
const { Instance } = await import("@/project/instance")
|
||||
const { Database } = await import("@/storage/db")
|
||||
const { DatabaseEvent } = await import("@/storage/event")
|
||||
const { parseSSE } = await import("@/control-plane/sse")
|
||||
|
||||
// register projectors so apply can find them
|
||||
await import("@/session/projectors")
|
||||
|
||||
const url = process.argv[2] || "http://127.0.0.1:4096/global/db-event"
|
||||
const ac = new AbortController()
|
||||
|
||||
process.on("SIGINT", () => ac.abort())
|
||||
process.on("SIGTERM", () => ac.abort())
|
||||
|
||||
async function run() {
|
||||
const res = await fetch(url, {
|
||||
headers: { accept: "text/event-stream" },
|
||||
signal: ac.signal,
|
||||
})
|
||||
|
||||
if (!res.ok) {
|
||||
console.error("failed to connect:", res.status, await res.text())
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
if (!res.body) {
|
||||
console.error("no response body")
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
console.log("connected, listening for events...\n")
|
||||
|
||||
await parseSSE(res.body, ac.signal, (event: any) => {
|
||||
console.log("[sse]", JSON.stringify(event, null, 2))
|
||||
if (event.type && event.data) {
|
||||
try {
|
||||
DatabaseEvent.replay(event)
|
||||
console.log("[apply] ok:", event.type)
|
||||
console.log("db path", Database.Path)
|
||||
} catch (err) {
|
||||
console.error("[apply] error:", err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
console.log("\ndisconnected")
|
||||
Database.close()
|
||||
// await fs.rm(dir, { recursive: true, force: true })
|
||||
}
|
||||
|
||||
run()
|
||||
@@ -81,3 +81,6 @@ Log.init({
|
||||
dev: true,
|
||||
level: "DEBUG",
|
||||
})
|
||||
|
||||
// Install database projectors
|
||||
import "../src/session/projectors"
|
||||
|
||||
@@ -20,7 +20,7 @@ describe("session.started event", () => {
|
||||
|
||||
const unsub = Bus.subscribe(Session.Event.Created, (event) => {
|
||||
eventReceived = true
|
||||
receivedInfo = event.properties.info as Session.Info
|
||||
receivedInfo = event.properties.data.info as Session.Info
|
||||
})
|
||||
|
||||
const session = await Session.create({})
|
||||
@@ -95,7 +95,7 @@ describe("step-finish token propagation via Bus event", () => {
|
||||
|
||||
let received: MessageV2.Part | undefined
|
||||
const unsub = Bus.subscribe(MessageV2.Event.PartUpdated, (event) => {
|
||||
received = event.properties.part
|
||||
received = event.properties.data.part
|
||||
})
|
||||
|
||||
const tokens = {
|
||||
|
||||
@@ -46,6 +46,7 @@ Instructions here.
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const skills = await Skill.all()
|
||||
console.log(skills)
|
||||
expect(skills.length).toBe(1)
|
||||
const testSkill = skills.find((s) => s.name === "test-skill")
|
||||
expect(testSkill).toBeDefined()
|
||||
|
||||
@@ -5,7 +5,7 @@ import { Database } from "../../src/storage/db"
|
||||
|
||||
describe("Database.Path", () => {
|
||||
test("returns database path for the current channel", () => {
|
||||
const file = path.basename(Database.Path)
|
||||
const file = path.basename(Database.getChannelPath())
|
||||
const expected = ["latest", "beta"].includes(Installation.CHANNEL)
|
||||
? "opencode.db"
|
||||
: `opencode-${Installation.CHANNEL.replace(/[^a-zA-Z0-9._-]/g, "-")}.db`
|
||||
|
||||
149
packages/opencode/test/storage/event.test.ts
Normal file
149
packages/opencode/test/storage/event.test.ts
Normal file
@@ -0,0 +1,149 @@
|
||||
import { describe, test, expect, beforeEach } from "bun:test"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import z from "zod"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { DatabaseEvent } from "../../src/storage/event"
|
||||
import { Database } from "../../src/storage/db"
|
||||
import { EventTable } from "../../src/storage/event.sql"
|
||||
import { Identifier } from "../../src/id/id"
|
||||
|
||||
beforeEach(() => {
|
||||
Database.Client.reset()
|
||||
})
|
||||
|
||||
function withInstance(fn: () => void | Promise<void>) {
|
||||
return async () => {
|
||||
await using tmp = await tmpdir()
|
||||
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
await fn()
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
describe("DatabaseEvent", () => {
|
||||
const Created = DatabaseEvent.define("item.created", "v1", z.object({ id: z.string(), name: z.string() }))
|
||||
const Sent = DatabaseEvent.agg("item_id").define("item.sent", "v1", z.object({ item_id: z.string(), to: z.string() }))
|
||||
|
||||
DatabaseEvent.addProjector(Created, () => {})
|
||||
DatabaseEvent.addProjector(Sent, () => {})
|
||||
|
||||
describe("run", () => {
|
||||
test(
|
||||
"inserts event row",
|
||||
withInstance(() => {
|
||||
DatabaseEvent.run(Created, { id: "msg_1", name: "first" })
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows).toHaveLength(1)
|
||||
expect(rows[0].name).toBe("item.created.v1")
|
||||
expect(rows[0].aggregateId).toBe("msg_1")
|
||||
}),
|
||||
)
|
||||
|
||||
test(
|
||||
"increments seq per aggregate",
|
||||
withInstance(() => {
|
||||
DatabaseEvent.run(Created, { id: "msg_1", name: "first" })
|
||||
DatabaseEvent.run(Created, { id: "msg_1", name: "second" })
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows).toHaveLength(2)
|
||||
expect(rows[1].seq).toBe(rows[0].seq + 1)
|
||||
}),
|
||||
)
|
||||
|
||||
test(
|
||||
"uses custom aggregate field from agg()",
|
||||
withInstance(() => {
|
||||
DatabaseEvent.run(Sent, { item_id: "msg_1", to: "james" })
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows).toHaveLength(1)
|
||||
expect(rows[0].aggregateId).toBe("msg_1")
|
||||
}),
|
||||
)
|
||||
|
||||
test(
|
||||
"emits events",
|
||||
withInstance(() => {
|
||||
const events: Array<{
|
||||
type: string
|
||||
properties: { seq: number; aggregateId: string; data: { id: string; name: string } }
|
||||
}> = []
|
||||
const unsub = Bus.subscribeAll((event) => events.push(event))
|
||||
|
||||
DatabaseEvent.run(Created, { id: "msg_1", name: "test" })
|
||||
|
||||
expect(events).toHaveLength(1)
|
||||
expect(events[0]).toEqual({
|
||||
type: "item.created.v1",
|
||||
properties: {
|
||||
seq: 1,
|
||||
aggregateId: "msg_1",
|
||||
data: {
|
||||
id: "msg_1",
|
||||
name: "test",
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
unsub()
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("replay", () => {
|
||||
test(
|
||||
"inserts event from external payload",
|
||||
withInstance(() => {
|
||||
const id = Identifier.descending("message")
|
||||
DatabaseEvent.replay({
|
||||
type: "item.created.v1",
|
||||
seq: 0,
|
||||
aggregateId: id,
|
||||
data: { id, name: "replayed" },
|
||||
})
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows).toHaveLength(1)
|
||||
expect(rows[0].aggregateId).toBe(id)
|
||||
}),
|
||||
)
|
||||
|
||||
test(
|
||||
"throws on sequence mismatch",
|
||||
withInstance(() => {
|
||||
const id = Identifier.descending("message")
|
||||
DatabaseEvent.replay({
|
||||
type: "item.created.v1",
|
||||
seq: 0,
|
||||
aggregateId: id,
|
||||
data: { id, name: "first" },
|
||||
})
|
||||
expect(() =>
|
||||
DatabaseEvent.replay({
|
||||
type: "item.created.v1",
|
||||
seq: 5,
|
||||
aggregateId: id,
|
||||
data: { id, name: "bad" },
|
||||
}),
|
||||
).toThrow(/Sequence mismatch/)
|
||||
}),
|
||||
)
|
||||
|
||||
test(
|
||||
"throws on unknown event type",
|
||||
withInstance(() => {
|
||||
expect(() =>
|
||||
DatabaseEvent.replay({
|
||||
type: "unknown.event.1",
|
||||
seq: 0,
|
||||
aggregateId: "x",
|
||||
data: {},
|
||||
}),
|
||||
).toThrow(/Unknown event type/)
|
||||
}),
|
||||
)
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user