mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-02 12:04:36 +00:00
Compare commits
4 Commits
effect-sha
...
effect-syn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
89c0db86b9 | ||
|
|
a68395bfef | ||
|
|
2e6d7bb517 | ||
|
|
dc719269b6 |
@@ -1,44 +1,152 @@
|
||||
import type * as SDK from "@opencode-ai/sdk/v2"
|
||||
import { Effect, Layer, Option, Scope, ServiceMap, Stream } from "effect"
|
||||
import { Account } from "@/account"
|
||||
import { Bus } from "@/bus"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { Provider } from "@/provider/provider"
|
||||
import { ModelID, ProviderID } from "@/provider/schema"
|
||||
import { Session } from "@/session"
|
||||
import { MessageV2 } from "@/session/message-v2"
|
||||
import type { SessionID } from "@/session/schema"
|
||||
import { Database, eq } from "@/storage/db"
|
||||
import { Account } from "@/account"
|
||||
import { Config } from "@/config/config"
|
||||
import { Log } from "@/util/log"
|
||||
import { Provider } from "@/provider/provider"
|
||||
import { ProviderID, ModelID } from "@/provider/schema"
|
||||
import { Session } from "@/session"
|
||||
import type { SessionID } from "@/session/schema"
|
||||
import { MessageV2 } from "@/session/message-v2"
|
||||
import { Database, eq } from "@/storage/db"
|
||||
import { SessionShareTable } from "./share.sql"
|
||||
import { Log } from "@/util/log"
|
||||
import type * as SDK from "@opencode-ai/sdk/v2"
|
||||
|
||||
export namespace ShareNext {
|
||||
const log = Log.create({ service: "share-next" })
|
||||
|
||||
type ApiEndpoints = {
|
||||
create: string
|
||||
sync: (shareId: string) => string
|
||||
remove: (shareId: string) => string
|
||||
data: (shareId: string) => string
|
||||
}
|
||||
|
||||
function apiEndpoints(resource: string): ApiEndpoints {
|
||||
return {
|
||||
create: `/api/${resource}`,
|
||||
sync: (shareId) => `/api/${resource}/${shareId}/sync`,
|
||||
remove: (shareId) => `/api/${resource}/${shareId}`,
|
||||
data: (shareId) => `/api/${resource}/${shareId}/data`,
|
||||
}
|
||||
}
|
||||
|
||||
const legacyApi = apiEndpoints("share")
|
||||
const consoleApi = apiEndpoints("shares")
|
||||
|
||||
export async function url() {
|
||||
const req = await request()
|
||||
return req.baseUrl
|
||||
}
|
||||
|
||||
export async function request(): Promise<{
|
||||
headers: Record<string, string>
|
||||
api: ApiEndpoints
|
||||
baseUrl: string
|
||||
}> {
|
||||
const headers: Record<string, string> = {}
|
||||
|
||||
const active = await Account.active()
|
||||
if (!active?.active_org_id) {
|
||||
const baseUrl = await Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai")
|
||||
return { headers, api: legacyApi, baseUrl }
|
||||
}
|
||||
|
||||
const token = await Account.token(active.id)
|
||||
if (!token) {
|
||||
throw new Error("No active account token available for sharing")
|
||||
}
|
||||
|
||||
headers["authorization"] = `Bearer ${token}`
|
||||
headers["x-org-id"] = active.active_org_id
|
||||
return { headers, api: consoleApi, baseUrl: active.url }
|
||||
}
|
||||
|
||||
const disabled = process.env["OPENCODE_DISABLE_SHARE"] === "true" || process.env["OPENCODE_DISABLE_SHARE"] === "1"
|
||||
|
||||
export type Api = {
|
||||
create: string
|
||||
sync: (shareID: string) => string
|
||||
remove: (shareID: string) => string
|
||||
data: (shareID: string) => string
|
||||
export async function init() {
|
||||
if (disabled) return
|
||||
Bus.subscribe(Session.Event.Updated, async (evt) => {
|
||||
const session = await Session.get(evt.properties.sessionID)
|
||||
|
||||
await sync(session.id, [
|
||||
{
|
||||
type: "session",
|
||||
data: session,
|
||||
},
|
||||
])
|
||||
})
|
||||
Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
|
||||
const info = evt.properties.info
|
||||
await sync(info.sessionID, [
|
||||
{
|
||||
type: "message",
|
||||
data: evt.properties.info,
|
||||
},
|
||||
])
|
||||
if (info.role === "user") {
|
||||
await sync(info.sessionID, [
|
||||
{
|
||||
type: "model",
|
||||
data: [await Provider.getModel(info.model.providerID, info.model.modelID).then((m) => m)],
|
||||
},
|
||||
])
|
||||
}
|
||||
})
|
||||
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
|
||||
await sync(evt.properties.part.sessionID, [
|
||||
{
|
||||
type: "part",
|
||||
data: evt.properties.part,
|
||||
},
|
||||
])
|
||||
})
|
||||
Bus.subscribe(Session.Event.Diff, async (evt) => {
|
||||
await sync(evt.properties.sessionID, [
|
||||
{
|
||||
type: "session_diff",
|
||||
data: evt.properties.diff,
|
||||
},
|
||||
])
|
||||
})
|
||||
}
|
||||
|
||||
export type Req = {
|
||||
headers: Record<string, string>
|
||||
api: Api
|
||||
baseUrl: string
|
||||
export async function create(sessionID: SessionID) {
|
||||
if (disabled) return { id: "", url: "", secret: "" }
|
||||
log.info("creating share", { sessionID })
|
||||
const req = await request()
|
||||
const response = await fetch(`${req.baseUrl}${req.api.create}`, {
|
||||
method: "POST",
|
||||
headers: { ...req.headers, "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ sessionID: sessionID }),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const message = await response.text().catch(() => response.statusText)
|
||||
throw new Error(`Failed to create share (${response.status}): ${message || response.statusText}`)
|
||||
}
|
||||
|
||||
const result = (await response.json()) as { id: string; url: string; secret: string }
|
||||
|
||||
Database.use((db) =>
|
||||
db
|
||||
.insert(SessionShareTable)
|
||||
.values({ session_id: sessionID, id: result.id, secret: result.secret, url: result.url })
|
||||
.onConflictDoUpdate({
|
||||
target: SessionShareTable.session_id,
|
||||
set: { id: result.id, secret: result.secret, url: result.url },
|
||||
})
|
||||
.run(),
|
||||
)
|
||||
fullSync(sessionID)
|
||||
return result
|
||||
}
|
||||
|
||||
export type Share = {
|
||||
id: string
|
||||
url: string
|
||||
secret: string
|
||||
}
|
||||
|
||||
type State = {
|
||||
queue: Map<string, { timeout: ReturnType<typeof setTimeout>; data: Map<string, Data> }>
|
||||
function get(sessionID: SessionID) {
|
||||
const row = Database.use((db) =>
|
||||
db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).get(),
|
||||
)
|
||||
if (!row) return
|
||||
return { id: row.id, secret: row.secret, url: row.url }
|
||||
}
|
||||
|
||||
type Data =
|
||||
@@ -63,49 +171,6 @@ export namespace ShareNext {
|
||||
data: SDK.Model[]
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
readonly init: () => Effect.Effect<void, unknown>
|
||||
readonly url: () => Effect.Effect<string, unknown>
|
||||
readonly request: () => Effect.Effect<Req, unknown>
|
||||
readonly create: (sessionID: SessionID) => Effect.Effect<Share, unknown>
|
||||
readonly remove: (sessionID: SessionID) => Effect.Effect<void, unknown>
|
||||
}
|
||||
|
||||
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/ShareNext") {}
|
||||
|
||||
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
|
||||
Effect.sync(() => Database.use(fn))
|
||||
|
||||
function api(resource: string): Api {
|
||||
return {
|
||||
create: `/api/${resource}`,
|
||||
sync: (shareID) => `/api/${resource}/${shareID}/sync`,
|
||||
remove: (shareID) => `/api/${resource}/${shareID}`,
|
||||
data: (shareID) => `/api/${resource}/${shareID}/data`,
|
||||
}
|
||||
}
|
||||
|
||||
const legacyApi = api("share")
|
||||
const consoleApi = api("shares")
|
||||
|
||||
async function rawRequest(): Promise<Req> {
|
||||
const headers: Record<string, string> = {}
|
||||
const active = await Account.active()
|
||||
if (!active?.active_org_id) {
|
||||
const baseUrl = (await Config.get()).enterprise?.url ?? "https://opncd.ai"
|
||||
return { headers, api: legacyApi, baseUrl }
|
||||
}
|
||||
|
||||
const token = await Account.token(active.id)
|
||||
if (!token) {
|
||||
throw new Error("No active account token available for sharing")
|
||||
}
|
||||
|
||||
headers.authorization = `Bearer ${token}`
|
||||
headers["x-org-id"] = active.active_org_id
|
||||
return { headers, api: consoleApi, baseUrl: active.url }
|
||||
}
|
||||
|
||||
function key(item: Data) {
|
||||
switch (item.type) {
|
||||
case "session":
|
||||
@@ -121,289 +186,102 @@ export namespace ShareNext {
|
||||
}
|
||||
}
|
||||
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const account = yield* Account.Service
|
||||
const bus = yield* Bus.Service
|
||||
const cfg = yield* Config.Service
|
||||
const provider = yield* Provider.Service
|
||||
const session = yield* Session.Service
|
||||
const scope = yield* Scope.Scope
|
||||
|
||||
function sync(sessionID: SessionID, data: Data[]): Effect.Effect<void> {
|
||||
return Effect.gen(function* () {
|
||||
if (disabled) return
|
||||
const s = yield* InstanceState.get(state)
|
||||
const existing = s.queue.get(sessionID)
|
||||
if (existing) {
|
||||
for (const item of data) {
|
||||
existing.data.set(key(item), item)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
const next = new Map(data.map((item) => [key(item), item]))
|
||||
const timeout = setTimeout(
|
||||
InstanceState.bind(() => {
|
||||
void runPromise(() =>
|
||||
flush(sessionID).pipe(
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.sync(() => {
|
||||
log.error("share flush failed", { sessionID, cause })
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
}),
|
||||
1000,
|
||||
)
|
||||
s.queue.set(sessionID, { timeout, data: next })
|
||||
})
|
||||
const queue = new Map<string, { timeout: NodeJS.Timeout; data: Map<string, Data> }>()
|
||||
async function sync(sessionID: SessionID, data: Data[]) {
|
||||
if (disabled) return
|
||||
const existing = queue.get(sessionID)
|
||||
if (existing) {
|
||||
for (const item of data) {
|
||||
existing.data.set(key(item), item)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
const state: InstanceState<State> = yield* InstanceState.make<State>(
|
||||
Effect.fn("ShareNext.state")(function* (_ctx) {
|
||||
const cache: State = { queue: new Map() }
|
||||
const dataMap = new Map<string, Data>()
|
||||
for (const item of data) {
|
||||
dataMap.set(key(item), item)
|
||||
}
|
||||
|
||||
yield* Effect.addFinalizer(() =>
|
||||
Effect.sync(() => {
|
||||
for (const item of cache.queue.values()) {
|
||||
clearTimeout(item.timeout)
|
||||
}
|
||||
cache.queue.clear()
|
||||
}),
|
||||
)
|
||||
const timeout = setTimeout(async () => {
|
||||
const queued = queue.get(sessionID)
|
||||
if (!queued) return
|
||||
queue.delete(sessionID)
|
||||
const share = get(sessionID)
|
||||
if (!share) return
|
||||
|
||||
if (disabled) return cache
|
||||
|
||||
const watch = <D extends { type: string }>(def: D, fn: (evt: { properties: any }) => Effect.Effect<void>) =>
|
||||
bus.subscribe(def as never).pipe(
|
||||
Stream.runForEach((evt) =>
|
||||
fn(evt).pipe(
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.sync(() => {
|
||||
log.error("share subscriber failed", { type: def.type, cause })
|
||||
}),
|
||||
),
|
||||
),
|
||||
),
|
||||
Effect.forkScoped,
|
||||
)
|
||||
|
||||
yield* watch(Session.Event.Updated, (evt) =>
|
||||
Effect.gen(function* () {
|
||||
const info = yield* session.get(evt.properties.sessionID)
|
||||
yield* sync(info.id, [{ type: "session", data: info }])
|
||||
}),
|
||||
)
|
||||
yield* watch(MessageV2.Event.Updated, (evt) =>
|
||||
Effect.gen(function* () {
|
||||
const info = evt.properties.info
|
||||
yield* sync(info.sessionID, [{ type: "message", data: info }])
|
||||
if (info.role !== "user") return
|
||||
const model = yield* provider.getModel(info.model.providerID, info.model.modelID)
|
||||
yield* sync(info.sessionID, [{ type: "model", data: [model] }])
|
||||
}),
|
||||
)
|
||||
yield* watch(MessageV2.Event.PartUpdated, (evt) =>
|
||||
sync(evt.properties.part.sessionID, [{ type: "part", data: evt.properties.part }]),
|
||||
)
|
||||
yield* watch(Session.Event.Diff, (evt) =>
|
||||
sync(evt.properties.sessionID, [{ type: "session_diff", data: evt.properties.diff }]),
|
||||
)
|
||||
|
||||
return cache
|
||||
const req = await request()
|
||||
const response = await fetch(`${req.baseUrl}${req.api.sync(share.id)}`, {
|
||||
method: "POST",
|
||||
headers: { ...req.headers, "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
secret: share.secret,
|
||||
data: Array.from(queued.data.values()),
|
||||
}),
|
||||
)
|
||||
|
||||
const request = Effect.fn("ShareNext.request")(function* () {
|
||||
const headers: Record<string, string> = {}
|
||||
const active = yield* account.active()
|
||||
if (Option.isNone(active) || !active.value.active_org_id) {
|
||||
const baseUrl = (yield* cfg.get()).enterprise?.url ?? "https://opncd.ai"
|
||||
return { headers, api: legacyApi, baseUrl } satisfies Req
|
||||
}
|
||||
|
||||
const token = yield* account.token(active.value.id)
|
||||
if (Option.isNone(token)) {
|
||||
throw new Error("No active account token available for sharing")
|
||||
}
|
||||
|
||||
headers.authorization = `Bearer ${token.value}`
|
||||
headers["x-org-id"] = active.value.active_org_id
|
||||
return { headers, api: consoleApi, baseUrl: active.value.url } satisfies Req
|
||||
})
|
||||
|
||||
const get = Effect.fnUntraced(function* (sessionID: SessionID) {
|
||||
const row = yield* db((db) =>
|
||||
db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).get(),
|
||||
)
|
||||
if (!row) return
|
||||
return { id: row.id, secret: row.secret, url: row.url } satisfies Share
|
||||
})
|
||||
|
||||
const text = Effect.fnUntraced(function* (res: Response) {
|
||||
return yield* Effect.promise(() => res.text().catch(() => res.statusText))
|
||||
})
|
||||
|
||||
const flush = Effect.fn("ShareNext.flush")(function* (sessionID: SessionID) {
|
||||
if (disabled) return
|
||||
const s = yield* InstanceState.get(state)
|
||||
const queued = s.queue.get(sessionID)
|
||||
if (!queued) return
|
||||
|
||||
s.queue.delete(sessionID)
|
||||
|
||||
const share = yield* get(sessionID)
|
||||
if (!share) return
|
||||
|
||||
const req = yield* request()
|
||||
const res = yield* Effect.promise(() =>
|
||||
fetch(`${req.baseUrl}${req.api.sync(share.id)}`, {
|
||||
method: "POST",
|
||||
headers: { ...req.headers, "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
secret: share.secret,
|
||||
data: Array.from(queued.data.values()),
|
||||
}),
|
||||
}),
|
||||
)
|
||||
|
||||
if (!res.ok) {
|
||||
log.warn("failed to sync share", { sessionID, shareID: share.id, status: res.status })
|
||||
}
|
||||
})
|
||||
|
||||
const full = Effect.fn("ShareNext.full")(function* (sessionID: SessionID) {
|
||||
log.info("full sync", { sessionID })
|
||||
const info = yield* session.get(sessionID)
|
||||
const diffs = yield* session.diff(sessionID)
|
||||
const messages = yield* Effect.sync(() => Array.from(MessageV2.stream(sessionID)))
|
||||
const models = yield* Effect.forEach(
|
||||
Array.from(
|
||||
new Map(
|
||||
messages
|
||||
.filter((msg) => msg.info.role === "user")
|
||||
.map((msg) => (msg.info as SDK.UserMessage).model)
|
||||
.map((item) => [`${item.providerID}/${item.modelID}`, item] as const),
|
||||
).values(),
|
||||
),
|
||||
(item) => provider.getModel(ProviderID.make(item.providerID), ModelID.make(item.modelID)),
|
||||
{ concurrency: 8 },
|
||||
)
|
||||
|
||||
yield* sync(sessionID, [
|
||||
{ type: "session", data: info },
|
||||
...messages.map((item) => ({ type: "message" as const, data: item.info })),
|
||||
...messages.flatMap((item) => item.parts.map((part) => ({ type: "part" as const, data: part }))),
|
||||
{ type: "session_diff", data: diffs },
|
||||
{ type: "model", data: models },
|
||||
])
|
||||
})
|
||||
|
||||
const init = Effect.fn("ShareNext.init")(function* () {
|
||||
if (disabled) return
|
||||
yield* InstanceState.get(state)
|
||||
})
|
||||
|
||||
const url = Effect.fn("ShareNext.url")(function* () {
|
||||
return (yield* request()).baseUrl
|
||||
})
|
||||
|
||||
const create = Effect.fn("ShareNext.create")(function* (sessionID: SessionID) {
|
||||
if (disabled) return { id: "", url: "", secret: "" }
|
||||
log.info("creating share", { sessionID })
|
||||
const req = yield* request()
|
||||
const res = yield* Effect.promise(() =>
|
||||
fetch(`${req.baseUrl}${req.api.create}`, {
|
||||
method: "POST",
|
||||
headers: { ...req.headers, "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ sessionID }),
|
||||
}),
|
||||
)
|
||||
|
||||
if (!res.ok) {
|
||||
const msg = yield* text(res)
|
||||
throw new Error(`Failed to create share (${res.status}): ${msg || res.statusText}`)
|
||||
}
|
||||
|
||||
const result = (yield* Effect.promise(() => res.json())) as Share
|
||||
yield* db((db) =>
|
||||
db
|
||||
.insert(SessionShareTable)
|
||||
.values({ session_id: sessionID, id: result.id, secret: result.secret, url: result.url })
|
||||
.onConflictDoUpdate({
|
||||
target: SessionShareTable.session_id,
|
||||
set: { id: result.id, secret: result.secret, url: result.url },
|
||||
})
|
||||
.run(),
|
||||
)
|
||||
yield* full(sessionID).pipe(
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.sync(() => {
|
||||
log.error("share full sync failed", { sessionID, cause })
|
||||
}),
|
||||
),
|
||||
Effect.forkIn(scope),
|
||||
)
|
||||
return result
|
||||
})
|
||||
|
||||
const remove = Effect.fn("ShareNext.remove")(function* (sessionID: SessionID) {
|
||||
if (disabled) return
|
||||
log.info("removing share", { sessionID })
|
||||
const share = yield* get(sessionID)
|
||||
if (!share) return
|
||||
|
||||
const req = yield* request()
|
||||
const res = yield* Effect.promise(() =>
|
||||
fetch(`${req.baseUrl}${req.api.remove(share.id)}`, {
|
||||
method: "DELETE",
|
||||
headers: { ...req.headers, "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ secret: share.secret }),
|
||||
}),
|
||||
)
|
||||
|
||||
if (!res.ok) {
|
||||
const msg = yield* text(res)
|
||||
throw new Error(`Failed to remove share (${res.status}): ${msg || res.statusText}`)
|
||||
}
|
||||
|
||||
yield* db((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run())
|
||||
})
|
||||
|
||||
return Service.of({ init, url, request, create, remove })
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(
|
||||
Layer.provide(Bus.layer),
|
||||
Layer.provide(Account.defaultLayer),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
Layer.provide(Provider.defaultLayer),
|
||||
Layer.provide(Session.defaultLayer),
|
||||
)
|
||||
|
||||
const { runPromise } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
export async function init() {
|
||||
return runPromise((svc) => svc.init())
|
||||
}
|
||||
|
||||
export async function url() {
|
||||
return (await rawRequest()).baseUrl
|
||||
}
|
||||
|
||||
export async function request(): Promise<Req> {
|
||||
return rawRequest()
|
||||
}
|
||||
|
||||
export async function create(sessionID: SessionID) {
|
||||
return runPromise((svc) => svc.create(sessionID))
|
||||
if (!response.ok) {
|
||||
log.warn("failed to sync share", { sessionID, shareID: share.id, status: response.status })
|
||||
}
|
||||
}, 1000)
|
||||
queue.set(sessionID, { timeout, data: dataMap })
|
||||
}
|
||||
|
||||
export async function remove(sessionID: SessionID) {
|
||||
return runPromise((svc) => svc.remove(sessionID))
|
||||
if (disabled) return
|
||||
log.info("removing share", { sessionID })
|
||||
const share = get(sessionID)
|
||||
if (!share) return
|
||||
|
||||
const req = await request()
|
||||
const response = await fetch(`${req.baseUrl}${req.api.remove(share.id)}`, {
|
||||
method: "DELETE",
|
||||
headers: { ...req.headers, "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
secret: share.secret,
|
||||
}),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const message = await response.text().catch(() => response.statusText)
|
||||
throw new Error(`Failed to remove share (${response.status}): ${message || response.statusText}`)
|
||||
}
|
||||
|
||||
Database.use((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run())
|
||||
}
|
||||
|
||||
async function fullSync(sessionID: SessionID) {
|
||||
log.info("full sync", { sessionID })
|
||||
const session = await Session.get(sessionID)
|
||||
const diffs = await Session.diff(sessionID)
|
||||
const messages = await Array.fromAsync(MessageV2.stream(sessionID))
|
||||
const models = await Promise.all(
|
||||
Array.from(
|
||||
new Map(
|
||||
messages
|
||||
.filter((m) => m.info.role === "user")
|
||||
.map((m) => (m.info as SDK.UserMessage).model)
|
||||
.map((m) => [`${m.providerID}/${m.modelID}`, m] as const),
|
||||
).values(),
|
||||
).map((m) => Provider.getModel(ProviderID.make(m.providerID), ModelID.make(m.modelID)).then((item) => item)),
|
||||
)
|
||||
await sync(sessionID, [
|
||||
{
|
||||
type: "session",
|
||||
data: session,
|
||||
},
|
||||
...messages.map((x) => ({
|
||||
type: "message" as const,
|
||||
data: x.info,
|
||||
})),
|
||||
...messages.flatMap((x) => x.parts.map((y) => ({ type: "part" as const, data: y }))),
|
||||
{
|
||||
type: "session_diff",
|
||||
data: diffs,
|
||||
},
|
||||
{
|
||||
type: "model",
|
||||
data: models,
|
||||
},
|
||||
])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
import z from "zod"
|
||||
import type { ZodObject } from "zod"
|
||||
import { EventEmitter } from "events"
|
||||
import { Effect, Layer, ServiceMap } from "effect"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { Database, eq } from "@/storage/db"
|
||||
import { Bus as ProjectBus } from "@/bus"
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
@@ -31,37 +34,18 @@ export namespace SyncEvent {
|
||||
|
||||
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
|
||||
|
||||
export const registry = new Map<string, Definition>()
|
||||
let projectors: Map<Definition, ProjectorFunc> | undefined
|
||||
const versions = new Map<string, number>()
|
||||
let frozen = false
|
||||
let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
|
||||
|
||||
const Bus = new EventEmitter<{ event: [{ def: Definition; event: Event }] }>()
|
||||
|
||||
export function reset() {
|
||||
frozen = false
|
||||
projectors = undefined
|
||||
convertEvent = (_, data) => data
|
||||
type State = {
|
||||
projectors: Map<Definition, ProjectorFunc> | undefined
|
||||
convert: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
|
||||
bus: EventEmitter<{ event: [{ def: Definition; event: Event }] }>
|
||||
}
|
||||
|
||||
export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) {
|
||||
projectors = new Map(input.projectors)
|
||||
export const registry = new Map<string, Definition>()
|
||||
const versions = new Map<string, number>()
|
||||
let frozen = false
|
||||
|
||||
// Install all the latest event defs to the bus. We only ever emit
|
||||
// latest versions from code, and keep around old versions for
|
||||
// replaying. Replaying does not go through the bus, and it
|
||||
// simplifies the bus to only use unversioned latest events
|
||||
for (let [type, version] of versions.entries()) {
|
||||
let def = registry.get(versionedType(type, version))!
|
||||
|
||||
BusEvent.define(def.type, def.properties || def.schema)
|
||||
}
|
||||
|
||||
// Freeze the system so it clearly errors if events are defined
|
||||
// after `init` which would cause bugs
|
||||
frozen = true
|
||||
convertEvent = input.convertEvent || ((_, data) => data)
|
||||
function noop(_: string, data: Event["data"]) {
|
||||
return data
|
||||
}
|
||||
|
||||
export function versionedType<A extends string>(type: A): A
|
||||
@@ -102,140 +86,224 @@ export namespace SyncEvent {
|
||||
return [def, func as ProjectorFunc]
|
||||
}
|
||||
|
||||
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
|
||||
if (projectors == null) {
|
||||
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
|
||||
}
|
||||
|
||||
const projector = projectors.get(def)
|
||||
if (!projector) {
|
||||
throw new Error(`Projector not found for event: ${def.type}`)
|
||||
}
|
||||
|
||||
// idempotent: need to ignore any events already logged
|
||||
|
||||
Database.transaction((tx) => {
|
||||
projector(tx, event.data)
|
||||
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
|
||||
tx.insert(EventSequenceTable)
|
||||
.values({
|
||||
aggregate_id: event.aggregateID,
|
||||
seq: event.seq,
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: EventSequenceTable.aggregate_id,
|
||||
set: { seq: event.seq },
|
||||
})
|
||||
.run()
|
||||
tx.insert(EventTable)
|
||||
.values({
|
||||
id: event.id,
|
||||
seq: event.seq,
|
||||
aggregate_id: event.aggregateID,
|
||||
type: versionedType(def.type, def.version),
|
||||
data: event.data as Record<string, unknown>,
|
||||
})
|
||||
.run()
|
||||
}
|
||||
|
||||
Database.effect(() => {
|
||||
Bus.emit("event", {
|
||||
def,
|
||||
event,
|
||||
})
|
||||
|
||||
if (options?.publish) {
|
||||
const result = convertEvent(def.type, event.data)
|
||||
if (result instanceof Promise) {
|
||||
result.then((data) => {
|
||||
ProjectBus.publish({ type: def.type, properties: def.schema }, data)
|
||||
})
|
||||
} else {
|
||||
ProjectBus.publish({ type: def.type, properties: def.schema }, result)
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
export interface Interface {
|
||||
readonly reset: () => Effect.Effect<void>
|
||||
readonly init: (input: {
|
||||
projectors: Array<[Definition, ProjectorFunc]>
|
||||
convertEvent?: State["convert"]
|
||||
}) => Effect.Effect<void>
|
||||
readonly replay: (event: SerializedEvent, options?: { republish: boolean }) => Effect.Effect<void>
|
||||
readonly run: <Def extends Definition>(def: Def, data: Event<Def>["data"]) => Effect.Effect<void>
|
||||
readonly remove: (aggregateID: string) => Effect.Effect<void>
|
||||
readonly subscribeAll: (handler: (event: { def: Definition; event: Event }) => void) => Effect.Effect<() => void>
|
||||
}
|
||||
|
||||
// TODO:
|
||||
//
|
||||
// * Support applying multiple events at one time. One transaction,
|
||||
// and it validets all the sequence ids
|
||||
// * when loading events from db, apply zod validation to ensure shape
|
||||
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SyncEvent") {}
|
||||
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const state: State = {
|
||||
projectors: undefined,
|
||||
convert: noop,
|
||||
bus: new EventEmitter<{ event: [{ def: Definition; event: Event }] }>(),
|
||||
}
|
||||
|
||||
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
|
||||
if (state.projectors == null) {
|
||||
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
|
||||
}
|
||||
|
||||
const projector = state.projectors.get(def)
|
||||
if (!projector) {
|
||||
throw new Error(`Projector not found for event: ${def.type}`)
|
||||
}
|
||||
|
||||
Database.transaction((tx) => {
|
||||
projector(tx, event.data)
|
||||
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
|
||||
tx.insert(EventSequenceTable)
|
||||
.values({
|
||||
aggregate_id: event.aggregateID,
|
||||
seq: event.seq,
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: EventSequenceTable.aggregate_id,
|
||||
set: { seq: event.seq },
|
||||
})
|
||||
.run()
|
||||
tx.insert(EventTable)
|
||||
.values({
|
||||
id: event.id,
|
||||
seq: event.seq,
|
||||
aggregate_id: event.aggregateID,
|
||||
type: versionedType(def.type, def.version),
|
||||
data: event.data as Record<string, unknown>,
|
||||
})
|
||||
.run()
|
||||
}
|
||||
|
||||
Database.effect(() => {
|
||||
state.bus.emit("event", { def, event })
|
||||
|
||||
if (!options.publish) return
|
||||
|
||||
const result = state.convert(def.type, event.data)
|
||||
if (result instanceof Promise) {
|
||||
result.then((data) => {
|
||||
ProjectBus.publish({ type: def.type, properties: def.schema }, data)
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
ProjectBus.publish({ type: def.type, properties: def.schema }, result)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
const reset = Effect.fn("SyncEvent.reset")(() =>
|
||||
Effect.sync(() => {
|
||||
frozen = false
|
||||
state.projectors = undefined
|
||||
state.convert = noop
|
||||
}),
|
||||
)
|
||||
|
||||
const init = Effect.fn("SyncEvent.init")(
|
||||
(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: State["convert"] }) =>
|
||||
Effect.sync(() => {
|
||||
state.projectors = new Map(input.projectors)
|
||||
|
||||
for (const [type, version] of versions.entries()) {
|
||||
const def = registry.get(versionedType(type, version))!
|
||||
BusEvent.define(def.type, def.properties)
|
||||
}
|
||||
|
||||
frozen = true
|
||||
state.convert = input.convertEvent || noop
|
||||
}),
|
||||
)
|
||||
|
||||
// TODO:
|
||||
//
|
||||
// * Support applying multiple events at one time. One transaction,
|
||||
// and it validets all the sequence ids
|
||||
// * when loading events from db, apply zod validation to ensure shape
|
||||
|
||||
const replay = Effect.fn("SyncEvent.replay")(function* (
|
||||
event: SerializedEvent,
|
||||
options?: { republish: boolean },
|
||||
) {
|
||||
const def = registry.get(event.type)
|
||||
if (!def) {
|
||||
throw new Error(`Unknown event type: ${event.type}`)
|
||||
}
|
||||
|
||||
const row = Database.use((db) =>
|
||||
db
|
||||
.select({ seq: EventSequenceTable.seq })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
|
||||
.get(),
|
||||
)
|
||||
|
||||
const latest = row?.seq ?? -1
|
||||
if (event.seq <= latest) {
|
||||
return
|
||||
}
|
||||
|
||||
const expected = latest + 1
|
||||
if (event.seq !== expected) {
|
||||
throw new Error(
|
||||
`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`,
|
||||
)
|
||||
}
|
||||
|
||||
yield* InstanceState.withALS(() => process(def, event, { publish: !!options?.republish }))
|
||||
})
|
||||
|
||||
const run: Interface["run"] = Effect.fn("SyncEvent.run")(function* <Def extends Definition>(
|
||||
def: Def,
|
||||
data: Event<Def>["data"],
|
||||
) {
|
||||
const agg = (data as Record<string, string>)[def.aggregate]
|
||||
if (agg == null) {
|
||||
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
|
||||
}
|
||||
|
||||
if (def.version !== versions.get(def.type)) {
|
||||
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
|
||||
}
|
||||
|
||||
yield* InstanceState.withALS(() =>
|
||||
Database.transaction(
|
||||
(tx) => {
|
||||
const id = EventID.ascending()
|
||||
const row = tx
|
||||
.select({ seq: EventSequenceTable.seq })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, agg))
|
||||
.get()
|
||||
const seq = row?.seq != null ? row.seq + 1 : 0
|
||||
|
||||
const event = { id, seq, aggregateID: agg, data }
|
||||
process(def, event, { publish: true })
|
||||
},
|
||||
{
|
||||
behavior: "immediate",
|
||||
},
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
const remove = Effect.fn("SyncEvent.remove")((aggregateID: string) =>
|
||||
Effect.sync(() => {
|
||||
Database.transaction((tx) => {
|
||||
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
|
||||
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
|
||||
})
|
||||
}),
|
||||
)
|
||||
|
||||
const subscribeAll = Effect.fn("SyncEvent.subscribeAll")(
|
||||
(handler: (event: { def: Definition; event: Event }) => void) =>
|
||||
Effect.sync(() => {
|
||||
state.bus.on("event", handler)
|
||||
return () => state.bus.off("event", handler)
|
||||
}),
|
||||
)
|
||||
|
||||
return Service.of({ reset, init, replay, run, remove, subscribeAll })
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer
|
||||
|
||||
const { runSync } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
export function reset() {
|
||||
return runSync((svc) => svc.reset())
|
||||
}
|
||||
|
||||
export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: State["convert"] }) {
|
||||
return runSync((svc) => svc.init(input))
|
||||
}
|
||||
|
||||
export function replay(event: SerializedEvent, options?: { republish: boolean }) {
|
||||
const def = registry.get(event.type)
|
||||
if (!def) {
|
||||
throw new Error(`Unknown event type: ${event.type}`)
|
||||
}
|
||||
|
||||
const row = Database.use((db) =>
|
||||
db
|
||||
.select({ seq: EventSequenceTable.seq })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
|
||||
.get(),
|
||||
)
|
||||
|
||||
const latest = row?.seq ?? -1
|
||||
if (event.seq <= latest) {
|
||||
return
|
||||
}
|
||||
|
||||
const expected = latest + 1
|
||||
if (event.seq !== expected) {
|
||||
throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
|
||||
}
|
||||
|
||||
process(def, event, { publish: !!options?.republish })
|
||||
return runSync((svc) => svc.replay(event, options))
|
||||
}
|
||||
|
||||
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"]) {
|
||||
const agg = (data as Record<string, string>)[def.aggregate]
|
||||
// This should never happen: we've enforced it via typescript in
|
||||
// the definition
|
||||
if (agg == null) {
|
||||
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
|
||||
}
|
||||
|
||||
if (def.version !== versions.get(def.type)) {
|
||||
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
|
||||
}
|
||||
|
||||
// Note that this is an "immediate" transaction which is critical.
|
||||
// We need to make sure we can safely read and write with nothing
|
||||
// else changing the data from under us
|
||||
Database.transaction(
|
||||
(tx) => {
|
||||
const id = EventID.ascending()
|
||||
const row = tx
|
||||
.select({ seq: EventSequenceTable.seq })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, agg))
|
||||
.get()
|
||||
const seq = row?.seq != null ? row.seq + 1 : 0
|
||||
|
||||
const event = { id, seq, aggregateID: agg, data }
|
||||
process(def, event, { publish: true })
|
||||
},
|
||||
{
|
||||
behavior: "immediate",
|
||||
},
|
||||
)
|
||||
return runSync((svc) => svc.run(def, data))
|
||||
}
|
||||
|
||||
export function remove(aggregateID: string) {
|
||||
Database.transaction((tx) => {
|
||||
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
|
||||
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
|
||||
})
|
||||
return runSync((svc) => svc.remove(aggregateID))
|
||||
}
|
||||
|
||||
export function subscribeAll(handler: (event: { def: Definition; event: Event }) => void) {
|
||||
Bus.on("event", handler)
|
||||
return () => Bus.off("event", handler)
|
||||
return runSync((svc) => svc.subscribeAll(handler))
|
||||
}
|
||||
|
||||
export function payloads() {
|
||||
|
||||
Reference in New Issue
Block a user