Compare commits

...

1 Commits

Author SHA1 Message Date
James Long
4e51c3611a feat(core): initial implementation of syncing 2026-03-16 10:39:25 -04:00
13 changed files with 2037 additions and 114 deletions

View File

@@ -0,0 +1,12 @@
CREATE TABLE `event_sequence` (
`aggregate_id` text PRIMARY KEY,
`seq` integer NOT NULL
);
--> statement-breakpoint
CREATE TABLE `event` (
`seq` integer NOT NULL,
`aggregateId` text NOT NULL,
`name` text NOT NULL,
`data` text NOT NULL,
CONSTRAINT `fk_event_aggregateId_event_sequence_aggregate_id_fk` FOREIGN KEY (`aggregateId`) REFERENCES `event_sequence`(`aggregate_id`) ON DELETE CASCADE
);

File diff suppressed because it is too large Load Diff

View File

@@ -90,6 +90,10 @@ export namespace Installation {
return CHANNEL === "local"
}
export function isTesting() {
return process.env.NODE_ENV === "test"
}
export async function method() {
if (process.execPath.includes(path.join(".opencode", "bin"))) return "curl"
if (process.execPath.includes(path.join(".local", "bin"))) return "curl"

View File

@@ -4,6 +4,7 @@ import { streamSSE } from "hono/streaming"
import z from "zod"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
import { DatabaseEvent } from "@/storage/event"
import { Instance } from "../../project/instance"
import { Installation } from "@/installation"
import { Log } from "../../util/log"
@@ -107,6 +108,75 @@ export const GlobalRoutes = lazy(() =>
})
},
)
.get(
"/db-event",
describeRoute({
summary: "Get database global events",
description: "Subscribe to database global events from the OpenCode system using server-sent events.",
operationId: "global.db-event",
responses: {
200: {
description: "Event stream",
content: {
"text/event-stream": {
schema: resolver(
z
.object({
directory: z.string(),
payload: BusEvent.payloads(),
})
.meta({
ref: "GlobalEvent",
}),
),
},
},
},
},
}),
async (c) => {
log.info("global event connected")
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
stream.writeSSE({
data: JSON.stringify({
payload: {
type: "server.connected",
properties: {},
},
}),
})
async function handler(event: any) {
await stream.writeSSE({
data: JSON.stringify(event),
})
}
DatabaseEvent.Bus.on("event", handler)
// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
stream.writeSSE({
data: JSON.stringify({
payload: {
type: "server.heartbeat",
properties: {},
},
}),
})
}, 10_000)
await new Promise<void>((resolve) => {
stream.onAbort(() => {
clearInterval(heartbeat)
DatabaseEvent.Bus.off("event", handler)
resolve()
log.info("db global event disconnected")
})
})
})
},
)
.get(
"/config",
describeRoute({

View File

@@ -10,9 +10,10 @@ import { Flag } from "../flag/flag"
import { Identifier } from "../id/id"
import { Installation } from "../installation"
import { Database, NotFoundError, eq, and, or, gte, isNull, desc, like, inArray, lt } from "../storage/db"
import { Database, NotFoundError, eq, and, gte, isNull, desc, like, inArray, lt } from "../storage/db"
import { DatabaseEvent } from "../storage/event"
import type { SQL } from "../storage/db"
import { SessionTable, MessageTable, PartTable } from "./session.sql"
import { SessionTable } from "./session.sql"
import { ProjectTable } from "../project/project.sql"
import { Storage } from "@/storage/storage"
import { Log } from "../util/log"
@@ -179,26 +180,42 @@ export namespace Session {
export type GlobalInfo = z.output<typeof GlobalInfo>
export const Event = {
Created: BusEvent.define(
Created: DatabaseEvent.define(
"session.created",
"v1",
z.object({
id: z.string(),
info: Info,
}),
),
Updated: BusEvent.define(
Shared: DatabaseEvent.define(
"session.shared",
"v1",
z.object({
id: z.string(),
url: z.string().optional(),
}),
),
Touch: DatabaseEvent.define("session.touch", "v1", z.object({ id: z.string(), time: z.number() })),
Updated: DatabaseEvent.define(
"session.updated",
"v1",
z.object({
id: z.string(),
info: Info,
}),
),
Deleted: BusEvent.define(
Deleted: DatabaseEvent.define(
"session.deleted",
"v1",
z.object({
id: z.string(),
info: Info,
}),
),
Diff: BusEvent.define(
Diff: DatabaseEvent.agg("sessionID").define(
"session.diff",
"v1",
z.object({
sessionID: z.string(),
diff: Snapshot.FileDiff.array(),
@@ -277,18 +294,8 @@ export namespace Session {
)
export const touch = fn(Identifier.schema("session"), async (sessionID) => {
const now = Date.now()
Database.use((db) => {
const row = db
.update(SessionTable)
.set({ time_updated: now })
.where(eq(SessionTable.id, sessionID))
.returning()
.get()
if (!row) throw new NotFoundError({ message: `Session not found: ${sessionID}` })
const info = fromRow(row)
Database.effect(() => Bus.publish(Event.Updated, { info }))
})
const time = Date.now()
DatabaseEvent.run(Event.Touch, { id: sessionID, time })
})
export async function createNext(input: {
@@ -315,20 +322,16 @@ export namespace Session {
},
}
log.info("created", result)
Database.use((db) => {
db.insert(SessionTable).values(toRow(result)).run()
Database.effect(() =>
Bus.publish(Event.Created, {
info: result,
}),
)
})
DatabaseEvent.run(Event.Created, { id: result.id, info: result })
const cfg = await Config.get()
if (!result.parentID && (Flag.OPENCODE_AUTO_SHARE || cfg.share === "auto"))
share(result.id).catch(() => {
// Silently ignore sharing errors during session creation
})
Bus.publish(Event.Updated, {
id: result.id,
info: result,
})
return result
@@ -354,12 +357,9 @@ export namespace Session {
}
const { ShareNext } = await import("@/share/share-next")
const share = await ShareNext.create(id)
Database.use((db) => {
const row = db.update(SessionTable).set({ share_url: share.url }).where(eq(SessionTable.id, id)).returning().get()
if (!row) throw new NotFoundError({ message: `Session not found: ${id}` })
const info = fromRow(row)
Database.effect(() => Bus.publish(Event.Updated, { info }))
})
DatabaseEvent.run(Event.Shared, { id, url: share.url })
return share
})
@@ -367,12 +367,8 @@ export namespace Session {
// Use ShareNext to remove the share (same as share function uses ShareNext to create)
const { ShareNext } = await import("@/share/share-next")
await ShareNext.remove(id)
Database.use((db) => {
const row = db.update(SessionTable).set({ share_url: null }).where(eq(SessionTable.id, id)).returning().get()
if (!row) throw new NotFoundError({ message: `Session not found: ${id}` })
const info = fromRow(row)
Database.effect(() => Bus.publish(Event.Updated, { info }))
})
DatabaseEvent.run(Event.Shared, { id, url: undefined })
})
export const setTitle = fn(
@@ -659,46 +655,25 @@ export namespace Session {
})
export const remove = fn(Identifier.schema("session"), async (sessionID) => {
const project = Instance.project
try {
const session = await get(sessionID)
for (const child of await children(sessionID)) {
await remove(child.id)
}
await unshare(sessionID).catch(() => {})
// CASCADE delete handles messages and parts automatically
Database.use((db) => {
db.delete(SessionTable).where(eq(SessionTable.id, sessionID)).run()
Database.effect(() =>
Bus.publish(Event.Deleted, {
info: session,
}),
)
})
DatabaseEvent.run(Event.Deleted, { id: sessionID, info: session })
} catch (e) {
log.error(e)
}
})
export const updateMessage = fn(MessageV2.Info, async (msg) => {
const time_created = msg.time.created
const { id, sessionID, ...data } = msg
Database.use((db) => {
db.insert(MessageTable)
.values({
id,
session_id: sessionID,
time_created,
data,
})
.onConflictDoUpdate({ target: MessageTable.id, set: { data } })
.run()
Database.effect(() =>
Bus.publish(MessageV2.Event.Updated, {
info: msg,
}),
)
DatabaseEvent.run(MessageV2.Event.Updated, {
id: msg.sessionID,
info: msg,
})
return msg
})
@@ -708,17 +683,9 @@ export namespace Session {
messageID: Identifier.schema("message"),
}),
async (input) => {
// CASCADE delete handles parts automatically
Database.use((db) => {
db.delete(MessageTable)
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
.run()
Database.effect(() =>
Bus.publish(MessageV2.Event.Removed, {
sessionID: input.sessionID,
messageID: input.messageID,
}),
)
DatabaseEvent.run(MessageV2.Event.Removed, {
sessionID: input.sessionID,
messageID: input.messageID,
})
return input.messageID
},
@@ -731,17 +698,10 @@ export namespace Session {
partID: Identifier.schema("part"),
}),
async (input) => {
Database.use((db) => {
db.delete(PartTable)
.where(and(eq(PartTable.id, input.partID), eq(PartTable.session_id, input.sessionID)))
.run()
Database.effect(() =>
Bus.publish(MessageV2.Event.PartRemoved, {
sessionID: input.sessionID,
messageID: input.messageID,
partID: input.partID,
}),
)
DatabaseEvent.run(MessageV2.Event.PartRemoved, {
sessionID: input.sessionID,
messageID: input.messageID,
partID: input.partID,
})
return input.partID
},
@@ -750,24 +710,10 @@ export namespace Session {
const UpdatePartInput = MessageV2.Part
export const updatePart = fn(UpdatePartInput, async (part) => {
const { id, messageID, sessionID, ...data } = part
const time = Date.now()
Database.use((db) => {
db.insert(PartTable)
.values({
id,
message_id: messageID,
session_id: sessionID,
time_created: time,
data,
})
.onConflictDoUpdate({ target: PartTable.id, set: { data } })
.run()
Database.effect(() =>
Bus.publish(MessageV2.Event.PartUpdated, {
part: structuredClone(part),
}),
)
DatabaseEvent.run(MessageV2.Event.PartUpdated, {
sessionID: part.sessionID,
part: structuredClone(part),
time: Date.now(),
})
return part
})

View File

@@ -7,6 +7,7 @@ import { LSP } from "../lsp"
import { Snapshot } from "@/snapshot"
import { fn } from "@/util/fn"
import { Database, eq, desc, inArray } from "@/storage/db"
import { DatabaseEvent } from "../storage/event"
import { MessageTable, PartTable } from "./session.sql"
import { ProviderTransform } from "@/provider/transform"
import { STATUS_CODES } from "http"
@@ -448,23 +449,29 @@ export namespace MessageV2 {
export type Info = z.infer<typeof Info>
export const Event = {
Updated: BusEvent.define(
Updated: DatabaseEvent.define(
"message.updated",
"v1",
z.object({
id: z.string(),
info: Info,
}),
),
Removed: BusEvent.define(
Removed: DatabaseEvent.agg("sessionID").define(
"message.removed",
"v1",
z.object({
sessionID: z.string(),
messageID: z.string(),
}),
),
PartUpdated: BusEvent.define(
PartUpdated: DatabaseEvent.agg("sessionID").define(
"message.part.updated",
"v1",
z.object({
sessionID: z.string(),
part: Part,
time: z.number(),
}),
),
PartDelta: BusEvent.define(
@@ -477,8 +484,9 @@ export namespace MessageV2 {
delta: z.string(),
}),
),
PartRemoved: BusEvent.define(
PartRemoved: DatabaseEvent.agg("sessionID").define(
"message.part.removed",
"v1",
z.object({
sessionID: z.string(),
messageID: z.string(),

View File

@@ -0,0 +1,93 @@
import { Bus } from "@/bus"
import { Database, NotFoundError, eq, and } from "../storage/db"
import { DatabaseEvent } from "@/storage/event"
import { Session } from "./index"
import { MessageV2 } from "./message-v2"
import { SessionTable, MessageTable, PartTable } from "./session.sql"
import { ProjectTable } from "../project/project.sql"
DatabaseEvent.addProjector(Session.Event.Created, (db, data) => {
const existing = db
.select({ id: ProjectTable.id })
.from(ProjectTable)
.where(eq(ProjectTable.id, data.info.projectID))
.get()
if (!existing) {
// Create a (temporary) project to make this work. In the future
// we should separate sessions and projects
db.insert(ProjectTable)
.values({
id: data.info.projectID,
worktree: data.info.directory,
sandboxes: [],
})
.run()
}
db.insert(SessionTable).values(Session.toRow(data.info)).run()
})
DatabaseEvent.addProjector(Session.Event.Shared, (db, data) => {
const row = db.update(SessionTable).set({ share_url: data.url }).where(eq(SessionTable.id, data.id)).returning().get()
if (!row) throw new NotFoundError({ message: `Session not found: ${data.id}` })
const info = Session.fromRow(row)
})
DatabaseEvent.addProjector(Session.Event.Touch, (db, data) => {
const row = db
.update(SessionTable)
.set({ time_updated: data.time })
.where(eq(SessionTable.id, data.id))
.returning()
.get()
if (!row) throw new NotFoundError({ message: `Session not found: ${data.id}` })
// const info = Session.fromRow(row)
// Database.effect(() => Bus.publish(Event.Updated, { id: data.id, info }))
})
DatabaseEvent.addProjector(Session.Event.Deleted, (db, data) => {
db.delete(SessionTable).where(eq(SessionTable.id, data.id)).run()
})
DatabaseEvent.addProjector(MessageV2.Event.Updated, (db, data) => {
const time_created = data.info.time.created
const { id, sessionID, ...rest } = data.info
db.insert(MessageTable)
.values({
id,
session_id: sessionID,
time_created,
data: rest,
})
.onConflictDoUpdate({ target: MessageTable.id, set: { data: rest } })
.run()
})
DatabaseEvent.addProjector(MessageV2.Event.Removed, (db, data) => {
db.delete(MessageTable)
.where(and(eq(MessageTable.id, data.messageID), eq(MessageTable.session_id, data.sessionID)))
.run()
})
DatabaseEvent.addProjector(MessageV2.Event.PartRemoved, (db, data) => {
db.delete(PartTable)
.where(and(eq(PartTable.id, data.partID), eq(PartTable.session_id, data.sessionID)))
.run()
})
DatabaseEvent.addProjector(MessageV2.Event.PartUpdated, (db, data) => {
const { id, messageID, sessionID, ...rest } = data.part
db.insert(PartTable)
.values({
id,
message_id: messageID,
session_id: sessionID,
time_created: data.time,
data: rest,
})
.onConflictDoUpdate({ target: PartTable.id, set: { data: rest } })
.run()
})

View File

@@ -29,6 +29,9 @@ const log = Log.create({ service: "db" })
export namespace Database {
export const Path = iife(() => {
if (Installation.isTesting()) {
return ":memory:"
}
const channel = Installation.CHANNEL
if (["latest", "beta"].includes(channel) || Flag.OPENCODE_DISABLE_CHANNEL_DB)
return path.join(Global.Path.data, "opencode.db")
@@ -153,19 +156,37 @@ export namespace Database {
}
}
export function transaction<T>(callback: (tx: TxOrDb) => T): T {
type NotPromise<T> = T extends Promise<any> ? never : T
export function _transaction<T>(
callback: (tx: TxOrDb) => NotPromise<T>,
options?: {
behavior?: "deferred" | "immediate" | "exclusive"
},
): NotPromise<T> {
try {
return callback(ctx.use().tx)
} catch (err) {
if (err instanceof Context.NotFound) {
const effects: (() => void | Promise<void>)[] = []
const result = (Client().transaction as any)((tx: TxOrDb) => {
return ctx.provide({ tx, effects }, () => callback(tx))
})
const result = Client().transaction(
(tx: TxOrDb) => {
return ctx.provide({ tx, effects }, () => callback(tx))
},
{ behavior: options?.behavior },
)
for (const effect of effects) effect()
return result
return result as NotPromise<T>
}
throw err
}
}
export function transaction<T>(callback: (tx: TxOrDb) => NotPromise<T>) {
return _transaction(callback)
}
export function immediateTransaction<T>(callback: (tx: TxOrDb) => NotPromise<T>) {
return _transaction(callback, { behavior: "immediate" })
}
}

View File

@@ -0,0 +1,15 @@
import { sqliteTable, text, integer } from "drizzle-orm/sqlite-core"
export const EventSequenceTable = sqliteTable("event_sequence", {
aggregate_id: text().notNull().primaryKey(),
seq: integer().notNull(),
})
export const EventTable = sqliteTable("event", {
seq: integer().notNull(),
aggregateId: text()
.notNull()
.references(() => EventSequenceTable.aggregate_id, { onDelete: "cascade" }),
name: text().notNull(),
data: text({ mode: "json" }).$type<Record<string, unknown>>().notNull(),
})

View File

@@ -0,0 +1,176 @@
import z from "zod"
import type { ZodObject } from "zod"
import { BusEvent } from "@/bus/bus-event"
import { Instance } from "../project/instance"
import { Bus as ProjectBus } from "@/bus"
import { Database, eq, max } from "./db"
import { EventSequenceTable, EventTable } from "./event.sql"
import { EventEmitter } from "events"
export namespace DatabaseEvent {
export type Definition = {
type: string
properties: ZodObject
version: string
aggregateField: string
}
const registry = new Map<string, Definition>()
const projectors = new Map<Definition, (db: Database.TxOrDb, data: unknown) => void>()
export type BusEvent = {
type: string
data: {
seq: number
aggregateId: string
data: Record<string, unknown>
}
}
export const Bus = new EventEmitter<{
event: [BusEvent]
}>()
function versionedName(type: string, version: string) {
return `${type}.${version}`
}
function hasInstance() {
try {
Instance.project
return true
} catch (err) {
return false
}
}
export function define<Type extends string, Properties extends ZodObject<{ id: z.ZodString }>>(
type: Type,
version: string,
properties: Properties,
) {
return agg("id").define(type, version, properties)
}
export function agg<F extends string>(aggregateField: F) {
return {
define<Type extends string, Properties extends ZodObject<Record<F, z.ZodString>>>(
type: Type,
version: string,
properties: Properties,
) {
const def = {
...BusEvent.define(type, properties),
version,
aggregateField,
}
registry.set(versionedName(def.type, def.version), def)
return def
},
}
}
export function addProjector<Def extends Definition>(
event: Def,
func: (db: Database.TxOrDb, data: z.output<Def["properties"]>) => void,
) {
projectors.set(event, func as (db: Database.TxOrDb, data: unknown) => void)
}
function process<Def extends Definition>(
event: Def,
input: { seq: number; aggregateId: string; data: z.output<Def["properties"]> },
) {
const projector = projectors.get(event)
if (!projector) {
throw new Error(`Projector not found for event: ${event.type}`)
}
// idempotent
Database.transaction((tx) => {
projector(tx, input.data)
tx.insert(EventSequenceTable)
.values({
aggregate_id: input.aggregateId,
seq: input.seq,
})
.onConflictDoUpdate({
target: EventSequenceTable.aggregate_id,
set: { seq: input.seq },
})
.run()
tx.insert(EventTable)
.values({
seq: input.seq,
aggregateId: input.aggregateId,
name: versionedName(event.type, event.version),
data: input.data as Record<string, unknown>,
})
.run()
})
}
// TODO:
//
// * Support applying multiple events at one time. One transaction,
// and it validets all the sequence ids
// * when loading events from db, apply zod validation to ensure shape
export function replay(event: BusEvent) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
}
const maxSeq = Database.use((db) =>
db
.select({ val: max(EventTable.seq) })
.from(EventTable)
.where(eq(EventTable.aggregateId, event.data.aggregateId))
.get(),
)
const expected = maxSeq ? maxSeq.val! + 1 : 0
if (event.data.seq !== expected) {
throw new Error(
`Sequence mismatch for aggregate "${event.data.aggregateId}": expected ${expected}, got ${event.data.seq}`,
)
}
process(def, event.data)
}
export function run<Def extends Definition>(event: Def, data: z.output<Def["properties"]>) {
const agg = data[event.aggregateField] as string
// This should never happen: we've enforced it via typescript
if (agg == null) {
throw new Error(`DatabaseEvent: "${event.aggregateField}" required but not found: ${JSON.stringify(event)}`)
}
Database.immediateTransaction((tx) => {
const row = tx
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, agg))
.get()
const seq = (row?.seq ?? 0) + 1
process(event, { seq, aggregateId: agg, data })
Database.effect(() => {
if (hasInstance()) {
ProjectBus.publish(event, data)
}
Bus.emit("event", {
type: versionedName(event.type, event.version),
data: {
seq: seq,
aggregateId: agg,
data: data,
},
})
})
})
}
}

View File

@@ -0,0 +1,86 @@
import os from "os"
import path from "path"
import fs from "fs/promises"
// Set XDG env vars BEFORE any src/ imports to isolate from real data
const dir = path.join(os.tmpdir(), "opencode-test-projection-" + process.pid)
await fs.mkdir(dir, { recursive: true })
process.env["XDG_DATA_HOME"] = path.join(dir, "share")
process.env["XDG_CACHE_HOME"] = path.join(dir, "cache")
process.env["XDG_CONFIG_HOME"] = path.join(dir, "config")
process.env["XDG_STATE_HOME"] = path.join(dir, "state")
// Write the cache version file
const cache = path.join(dir, "cache", "opencode")
await fs.mkdir(cache, { recursive: true })
await fs.writeFile(path.join(cache, "version"), "14")
// Now safe to import src/
const { Log } = await import("@/util/log")
Log.init({ print: true, dev: true, level: "DEBUG" })
const { Instance } = await import("@/project/instance")
const { Database } = await import("@/storage/db")
const { GlobalBus } = await import("@/bus/global")
const { Bus } = await import("@/bus")
const { Session } = await import("@/session")
const { Server } = await import("@/server/server")
const { SessionPrompt } = await import("@/session/prompt")
// register projectors
await import("@/session/projectors")
async function wait(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms)
})
}
async function run() {
console.log("project id:", Instance.project.id)
// start the server
const server = Server.listen({
port: 0,
hostname: "127.0.0.1",
})
console.log("server listening on:", server.url.toString())
console.log("SSE endpoint:", `${server.url}event`)
const base = server.url.toString().replace(/\/$/, "")
console.log("\nServer running. Try:")
console.log(` curl -N ${base}/event`)
console.log("\nPress Ctrl+C to stop.\n")
while (1) {
await wait(5000)
const session = await Session.create({
title: "test session",
})
console.log("created session:", session.id, session.title)
// send messages to the session
async function prompt(text: string) {
console.log(`\n--- sending: "${text}" ---`)
await SessionPrompt.prompt({
sessionID: session.id,
parts: [{ type: "text", text }],
})
console.log(`--- done: "${text}" ---`)
}
await prompt("What is 2 + 2?")
await wait(2500)
await prompt("Now multiply that by 10")
await wait(2500)
await prompt("Summarize what we've discussed")
}
await new Promise(() => {})
}
await Instance.provide({
directory: "~/tmp/project-test7",
fn: run,
})

View File

@@ -0,0 +1,72 @@
import os from "os"
import path from "path"
import fs from "fs/promises"
// Set XDG env vars BEFORE any src/ imports to isolate from real data
const dir = path.join("/Users/james/tmp/opencode-test-replicate")
await fs.mkdir(dir, { recursive: true })
process.env["XDG_DATA_HOME"] = path.join(dir, "share")
process.env["XDG_CACHE_HOME"] = path.join(dir, "cache")
process.env["XDG_CONFIG_HOME"] = path.join(dir, "config")
process.env["XDG_STATE_HOME"] = path.join(dir, "state")
// Write the cache version file
const cache = path.join(dir, "cache", "opencode")
await fs.mkdir(cache, { recursive: true })
await fs.writeFile(path.join(cache, "version"), "14")
// Now safe to import src/
const { Log } = await import("@/util/log")
Log.init({ print: true, dev: true, level: "DEBUG" })
const { Instance } = await import("@/project/instance")
const { Database } = await import("@/storage/db")
const { DatabaseEvent } = await import("@/storage/event")
const { parseSSE } = await import("@/control-plane/sse")
// register projectors so apply can find them
await import("@/session/projectors")
const url = process.argv[2] || "http://127.0.0.1:4096/global/db-event"
const ac = new AbortController()
process.on("SIGINT", () => ac.abort())
process.on("SIGTERM", () => ac.abort())
async function run() {
const res = await fetch(url, {
headers: { accept: "text/event-stream" },
signal: ac.signal,
})
if (!res.ok) {
console.error("failed to connect:", res.status, await res.text())
process.exit(1)
}
if (!res.body) {
console.error("no response body")
process.exit(1)
}
console.log("connected, listening for events...\n")
await parseSSE(res.body, ac.signal, (event: any) => {
console.log("[sse]", JSON.stringify(event, null, 2))
if (event.type && event.data) {
try {
DatabaseEvent.replay(event)
console.log("[apply] ok:", event.type)
console.log("db path", Database.Path)
} catch (err) {
console.error("[apply] error:", err)
}
}
})
console.log("\ndisconnected")
Database.close()
// await fs.rm(dir, { recursive: true, force: true })
}
run()

View File

@@ -0,0 +1,114 @@
import { describe, test, expect, beforeEach } from "bun:test"
import { tmpdir } from "../fixture/fixture"
import z from "zod"
import { Bus } from "../../src/bus"
import { Instance } from "../../src/project/instance"
import { DatabaseEvent } from "../../src/storage/event"
import { Database } from "../../src/storage/db"
import { EventTable } from "../../src/storage/event.sql"
import { Identifier } from "../../src/id/id"
beforeEach(() => {
Database.Client.reset()
})
describe("DatabaseEvent", () => {
const Created = DatabaseEvent.define("item.created", "v1", z.object({ id: z.string(), name: z.string() }))
const Sent = DatabaseEvent.agg("item_id").define("item.sent", "v1", z.object({ item_id: z.string(), to: z.string() }))
DatabaseEvent.addProjector(Created, () => {})
DatabaseEvent.addProjector(Sent, () => {})
describe("run", () => {
test("inserts event row", () => {
DatabaseEvent.run(Created, { id: "msg_1", name: "first" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].name).toBe("item.created.v1")
expect(rows[0].aggregateId).toBe("msg_1")
})
test("increments seq per aggregate", () => {
DatabaseEvent.run(Created, { id: "msg_1", name: "first" })
DatabaseEvent.run(Created, { id: "msg_1", name: "second" })
const rows = Database.use((db) => db.select().from(EventTable).all())
console.log(rows)
expect(rows).toHaveLength(2)
expect(rows[1].seq).toBe(rows[0].seq + 1)
})
test("uses custom aggregate field from agg()", () => {
DatabaseEvent.run(Sent, { item_id: "msg_1", to: "james" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregateId).toBe("msg_1")
})
test("emits events", async () => {
await using tmp = await tmpdir()
Instance.provide({
directory: tmp.path,
fn: async () => {
const dbEvents: DatabaseEvent.BusEvent[] = []
DatabaseEvent.Bus.on("event", (e) => dbEvents.push(e))
const events: Array<any> = []
const unsubscribe = Bus.subscribeAll((e) => events.push(e))
DatabaseEvent.run(Created, { id: "msg_1", name: "test" })
expect(events).toHaveLength(1)
expect(events[0]).toEqual({
type: "item.created",
properties: {
id: "msg_1",
name: "test",
},
})
expect(dbEvents).toHaveLength(1)
DatabaseEvent.Bus.removeAllListeners("event")
unsubscribe()
},
})
})
})
describe("replay", () => {
test("inserts event from external payload", () => {
const id = Identifier.descending("message")
DatabaseEvent.replay({
type: "item.created.v1",
data: { seq: 1, aggregateId: id, data: { id, name: "replayed" } },
})
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregateId).toBe(id)
})
test("throws on sequence mismatch", () => {
const id = Identifier.descending("message")
DatabaseEvent.replay({
type: "item.created.v1",
data: { seq: 1, aggregateId: id, data: { id, name: "first" } },
})
expect(() =>
DatabaseEvent.replay({
type: "item.created.v1",
data: { seq: 5, aggregateId: id, data: { id, name: "bad" } },
}),
).toThrow(/Sequence mismatch/)
})
test("throws on unknown event type", () => {
expect(() =>
DatabaseEvent.replay({
type: "unknown.event.1",
data: { seq: 0, aggregateId: "x", data: {} },
}),
).toThrow(/Unknown event type/)
})
})
})