mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-16 10:54:52 +00:00
Compare commits
1 Commits
beta
...
kit/ns-sha
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
40ca8a6fc4 |
@@ -21,7 +21,7 @@ import { cmd } from "./cmd"
|
||||
import { ModelsDev } from "../../provider/models"
|
||||
import { Instance } from "@/project/instance"
|
||||
import { bootstrap } from "../bootstrap"
|
||||
import { SessionShare } from "@/share/session"
|
||||
import { SessionShare } from "@/share"
|
||||
import { Session } from "../../session"
|
||||
import type { SessionID } from "../../session/schema"
|
||||
import { MessageID, PartID } from "../../session/schema"
|
||||
|
||||
@@ -7,7 +7,7 @@ import { bootstrap } from "../bootstrap"
|
||||
import { Database } from "../../storage/db"
|
||||
import { SessionTable, MessageTable, PartTable } from "../../session/session.sql"
|
||||
import { Instance } from "../../project/instance"
|
||||
import { ShareNext } from "../../share/share-next"
|
||||
import { ShareNext } from "../../share"
|
||||
import { EOL } from "os"
|
||||
import { Filesystem } from "../../util/filesystem"
|
||||
import { AppRuntime } from "@/effect/app-runtime"
|
||||
|
||||
@@ -45,8 +45,8 @@ import { Vcs } from "@/project/vcs"
|
||||
import { Worktree } from "@/worktree"
|
||||
import { Pty } from "@/pty"
|
||||
import { Installation } from "@/installation"
|
||||
import { ShareNext } from "@/share/share-next"
|
||||
import { SessionShare } from "@/share/session"
|
||||
import { ShareNext } from "@/share"
|
||||
import { SessionShare } from "@/share"
|
||||
|
||||
export const AppLayer = Layer.mergeAll(
|
||||
AppFileSystem.defaultLayer,
|
||||
|
||||
@@ -5,7 +5,7 @@ import { Plugin } from "@/plugin"
|
||||
import { LSP } from "@/lsp"
|
||||
import { FileWatcher } from "@/file/watcher"
|
||||
import { Format } from "@/format"
|
||||
import { ShareNext } from "@/share/share-next"
|
||||
import { ShareNext } from "@/share"
|
||||
import { File } from "@/file"
|
||||
import { Vcs } from "@/project/vcs"
|
||||
import { Snapshot } from "@/snapshot"
|
||||
|
||||
@@ -10,7 +10,7 @@ import { Command } from "../command"
|
||||
import { Instance } from "./instance"
|
||||
import { Log } from "@/util/log"
|
||||
import { FileWatcher } from "@/file/watcher"
|
||||
import { ShareNext } from "@/share/share-next"
|
||||
import { ShareNext } from "@/share"
|
||||
import * as Effect from "effect/Effect"
|
||||
|
||||
export const InstanceBootstrap = Effect.gen(function* () {
|
||||
|
||||
@@ -9,7 +9,7 @@ import { SessionPrompt } from "../../session/prompt"
|
||||
import { SessionRunState } from "@/session/run-state"
|
||||
import { SessionCompaction } from "../../session/compaction"
|
||||
import { SessionRevert } from "../../session/revert"
|
||||
import { SessionShare } from "@/share/session"
|
||||
import { SessionShare } from "@/share"
|
||||
import { SessionStatus } from "@/session/status"
|
||||
import { SessionSummary } from "@/session/summary"
|
||||
import { Todo } from "../../session/todo"
|
||||
|
||||
2
packages/opencode/src/share/index.ts
Normal file
2
packages/opencode/src/share/index.ts
Normal file
@@ -0,0 +1,2 @@
|
||||
export * as ShareNext from "./share-next"
|
||||
export * as SessionShare from "./session"
|
||||
@@ -4,56 +4,54 @@ import { SyncEvent } from "@/sync"
|
||||
import { Effect, Layer, Scope, Context } from "effect"
|
||||
import { Config } from "../config"
|
||||
import { Flag } from "../flag/flag"
|
||||
import { ShareNext } from "./share-next"
|
||||
import { ShareNext } from "."
|
||||
|
||||
export namespace SessionShare {
|
||||
export interface Interface {
|
||||
readonly create: (input?: Session.CreateInput) => Effect.Effect<Session.Info>
|
||||
readonly share: (sessionID: SessionID) => Effect.Effect<{ url: string }, unknown>
|
||||
readonly unshare: (sessionID: SessionID) => Effect.Effect<void, unknown>
|
||||
}
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/SessionShare") {}
|
||||
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const cfg = yield* Config.Service
|
||||
const session = yield* Session.Service
|
||||
const shareNext = yield* ShareNext.Service
|
||||
const scope = yield* Scope.Scope
|
||||
|
||||
const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) {
|
||||
const conf = yield* cfg.get()
|
||||
if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration")
|
||||
const result = yield* shareNext.create(sessionID)
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }),
|
||||
)
|
||||
return result
|
||||
})
|
||||
|
||||
const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) {
|
||||
yield* shareNext.remove(sessionID)
|
||||
yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } }))
|
||||
})
|
||||
|
||||
const create = Effect.fn("SessionShare.create")(function* (input?: Session.CreateInput) {
|
||||
const result = yield* session.create(input)
|
||||
if (result.parentID) return result
|
||||
const conf = yield* cfg.get()
|
||||
if (!(Flag.OPENCODE_AUTO_SHARE || conf.share === "auto")) return result
|
||||
yield* share(result.id).pipe(Effect.ignore, Effect.forkIn(scope))
|
||||
return result
|
||||
})
|
||||
|
||||
return Service.of({ create, share, unshare })
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(
|
||||
Layer.provide(ShareNext.defaultLayer),
|
||||
Layer.provide(Session.defaultLayer),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
)
|
||||
export interface Interface {
|
||||
readonly create: (input?: Session.CreateInput) => Effect.Effect<Session.Info>
|
||||
readonly share: (sessionID: SessionID) => Effect.Effect<{ url: string }, unknown>
|
||||
readonly unshare: (sessionID: SessionID) => Effect.Effect<void, unknown>
|
||||
}
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/SessionShare") {}
|
||||
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const cfg = yield* Config.Service
|
||||
const session = yield* Session.Service
|
||||
const shareNext = yield* ShareNext.Service
|
||||
const scope = yield* Scope.Scope
|
||||
|
||||
const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) {
|
||||
const conf = yield* cfg.get()
|
||||
if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration")
|
||||
const result = yield* shareNext.create(sessionID)
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }),
|
||||
)
|
||||
return result
|
||||
})
|
||||
|
||||
const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) {
|
||||
yield* shareNext.remove(sessionID)
|
||||
yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } }))
|
||||
})
|
||||
|
||||
const create = Effect.fn("SessionShare.create")(function* (input?: Session.CreateInput) {
|
||||
const result = yield* session.create(input)
|
||||
if (result.parentID) return result
|
||||
const conf = yield* cfg.get()
|
||||
if (!(Flag.OPENCODE_AUTO_SHARE || conf.share === "auto")) return result
|
||||
yield* share(result.id).pipe(Effect.ignore, Effect.forkIn(scope))
|
||||
return result
|
||||
})
|
||||
|
||||
return Service.of({ create, share, unshare })
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(
|
||||
Layer.provide(ShareNext.defaultLayer),
|
||||
Layer.provide(Session.defaultLayer),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
)
|
||||
|
||||
@@ -14,337 +14,335 @@ import { Config } from "@/config"
|
||||
import { Log } from "@/util/log"
|
||||
import { SessionShareTable } from "./share.sql"
|
||||
|
||||
export namespace ShareNext {
|
||||
const log = Log.create({ service: "share-next" })
|
||||
const disabled = process.env["OPENCODE_DISABLE_SHARE"] === "true" || process.env["OPENCODE_DISABLE_SHARE"] === "1"
|
||||
const log = Log.create({ service: "share-next" })
|
||||
const disabled = process.env["OPENCODE_DISABLE_SHARE"] === "true" || process.env["OPENCODE_DISABLE_SHARE"] === "1"
|
||||
|
||||
export type Api = {
|
||||
create: string
|
||||
sync: (shareID: string) => string
|
||||
remove: (shareID: string) => string
|
||||
data: (shareID: string) => string
|
||||
}
|
||||
export type Api = {
|
||||
create: string
|
||||
sync: (shareID: string) => string
|
||||
remove: (shareID: string) => string
|
||||
data: (shareID: string) => string
|
||||
}
|
||||
|
||||
export type Req = {
|
||||
headers: Record<string, string>
|
||||
api: Api
|
||||
baseUrl: string
|
||||
}
|
||||
export type Req = {
|
||||
headers: Record<string, string>
|
||||
api: Api
|
||||
baseUrl: string
|
||||
}
|
||||
|
||||
const ShareSchema = Schema.Struct({
|
||||
id: Schema.String,
|
||||
url: Schema.String,
|
||||
secret: Schema.String,
|
||||
})
|
||||
export type Share = typeof ShareSchema.Type
|
||||
const ShareSchema = Schema.Struct({
|
||||
id: Schema.String,
|
||||
url: Schema.String,
|
||||
secret: Schema.String,
|
||||
})
|
||||
export type Share = typeof ShareSchema.Type
|
||||
|
||||
type State = {
|
||||
queue: Map<string, { data: Map<string, Data> }>
|
||||
scope: Scope.Closeable
|
||||
}
|
||||
type State = {
|
||||
queue: Map<string, { data: Map<string, Data> }>
|
||||
scope: Scope.Closeable
|
||||
}
|
||||
|
||||
type Data =
|
||||
| {
|
||||
type: "session"
|
||||
data: SDK.Session
|
||||
}
|
||||
| {
|
||||
type: "message"
|
||||
data: SDK.Message
|
||||
}
|
||||
| {
|
||||
type: "part"
|
||||
data: SDK.Part
|
||||
}
|
||||
| {
|
||||
type: "session_diff"
|
||||
data: SDK.SnapshotFileDiff[]
|
||||
}
|
||||
| {
|
||||
type: "model"
|
||||
data: SDK.Model[]
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
readonly init: () => Effect.Effect<void, unknown>
|
||||
readonly url: () => Effect.Effect<string, unknown>
|
||||
readonly request: () => Effect.Effect<Req, unknown>
|
||||
readonly create: (sessionID: SessionID) => Effect.Effect<Share, unknown>
|
||||
readonly remove: (sessionID: SessionID) => Effect.Effect<void, unknown>
|
||||
}
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/ShareNext") {}
|
||||
|
||||
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
|
||||
Effect.sync(() => Database.use(fn))
|
||||
|
||||
function api(resource: string): Api {
|
||||
return {
|
||||
create: `/api/${resource}`,
|
||||
sync: (shareID) => `/api/${resource}/${shareID}/sync`,
|
||||
remove: (shareID) => `/api/${resource}/${shareID}`,
|
||||
data: (shareID) => `/api/${resource}/${shareID}/data`,
|
||||
type Data =
|
||||
| {
|
||||
type: "session"
|
||||
data: SDK.Session
|
||||
}
|
||||
}
|
||||
|
||||
const legacyApi = api("share")
|
||||
const consoleApi = api("shares")
|
||||
|
||||
function key(item: Data) {
|
||||
switch (item.type) {
|
||||
case "session":
|
||||
return "session"
|
||||
case "message":
|
||||
return `message/${item.data.id}`
|
||||
case "part":
|
||||
return `part/${item.data.messageID}/${item.data.id}`
|
||||
case "session_diff":
|
||||
return "session_diff"
|
||||
case "model":
|
||||
return "model"
|
||||
| {
|
||||
type: "message"
|
||||
data: SDK.Message
|
||||
}
|
||||
| {
|
||||
type: "part"
|
||||
data: SDK.Part
|
||||
}
|
||||
| {
|
||||
type: "session_diff"
|
||||
data: SDK.SnapshotFileDiff[]
|
||||
}
|
||||
| {
|
||||
type: "model"
|
||||
data: SDK.Model[]
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
readonly init: () => Effect.Effect<void, unknown>
|
||||
readonly url: () => Effect.Effect<string, unknown>
|
||||
readonly request: () => Effect.Effect<Req, unknown>
|
||||
readonly create: (sessionID: SessionID) => Effect.Effect<Share, unknown>
|
||||
readonly remove: (sessionID: SessionID) => Effect.Effect<void, unknown>
|
||||
}
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/ShareNext") {}
|
||||
|
||||
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
|
||||
Effect.sync(() => Database.use(fn))
|
||||
|
||||
function api(resource: string): Api {
|
||||
return {
|
||||
create: `/api/${resource}`,
|
||||
sync: (shareID) => `/api/${resource}/${shareID}/sync`,
|
||||
remove: (shareID) => `/api/${resource}/${shareID}`,
|
||||
data: (shareID) => `/api/${resource}/${shareID}/data`,
|
||||
}
|
||||
}
|
||||
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const account = yield* Account.Service
|
||||
const bus = yield* Bus.Service
|
||||
const cfg = yield* Config.Service
|
||||
const http = yield* HttpClient.HttpClient
|
||||
const httpOk = HttpClient.filterStatusOk(http)
|
||||
const provider = yield* Provider.Service
|
||||
const session = yield* Session.Service
|
||||
const legacyApi = api("share")
|
||||
const consoleApi = api("shares")
|
||||
|
||||
function sync(sessionID: SessionID, data: Data[]): Effect.Effect<void> {
|
||||
return Effect.gen(function* () {
|
||||
if (disabled) return
|
||||
const s = yield* InstanceState.get(state)
|
||||
const existing = s.queue.get(sessionID)
|
||||
if (existing) {
|
||||
for (const item of data) {
|
||||
existing.data.set(key(item), item)
|
||||
}
|
||||
return
|
||||
function key(item: Data) {
|
||||
switch (item.type) {
|
||||
case "session":
|
||||
return "session"
|
||||
case "message":
|
||||
return `message/${item.data.id}`
|
||||
case "part":
|
||||
return `part/${item.data.messageID}/${item.data.id}`
|
||||
case "session_diff":
|
||||
return "session_diff"
|
||||
case "model":
|
||||
return "model"
|
||||
}
|
||||
}
|
||||
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const account = yield* Account.Service
|
||||
const bus = yield* Bus.Service
|
||||
const cfg = yield* Config.Service
|
||||
const http = yield* HttpClient.HttpClient
|
||||
const httpOk = HttpClient.filterStatusOk(http)
|
||||
const provider = yield* Provider.Service
|
||||
const session = yield* Session.Service
|
||||
|
||||
function sync(sessionID: SessionID, data: Data[]): Effect.Effect<void> {
|
||||
return Effect.gen(function* () {
|
||||
if (disabled) return
|
||||
const s = yield* InstanceState.get(state)
|
||||
const existing = s.queue.get(sessionID)
|
||||
if (existing) {
|
||||
for (const item of data) {
|
||||
existing.data.set(key(item), item)
|
||||
}
|
||||
|
||||
const next = new Map(data.map((item) => [key(item), item]))
|
||||
s.queue.set(sessionID, { data: next })
|
||||
yield* flush(sessionID).pipe(
|
||||
Effect.delay(1000),
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.sync(() => {
|
||||
log.error("share flush failed", { sessionID, cause })
|
||||
}),
|
||||
),
|
||||
Effect.forkIn(s.scope),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
const state: InstanceState.InstanceState<State> = yield* InstanceState.make<State>(
|
||||
Effect.fn("ShareNext.state")(function* (_ctx) {
|
||||
const cache: State = { queue: new Map(), scope: yield* Scope.make() }
|
||||
|
||||
yield* Effect.addFinalizer(() =>
|
||||
Scope.close(cache.scope, Exit.void).pipe(
|
||||
Effect.andThen(
|
||||
Effect.sync(() => {
|
||||
cache.queue.clear()
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
if (disabled) return cache
|
||||
|
||||
const watch = <D extends { type: string }>(
|
||||
def: D,
|
||||
fn: (evt: { properties: any }) => Effect.Effect<void, unknown>,
|
||||
) =>
|
||||
bus.subscribe(def as never).pipe(
|
||||
Stream.runForEach((evt) =>
|
||||
fn(evt).pipe(
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.sync(() => {
|
||||
log.error("share subscriber failed", { type: def.type, cause })
|
||||
}),
|
||||
),
|
||||
),
|
||||
),
|
||||
Effect.forkScoped,
|
||||
)
|
||||
|
||||
yield* watch(Session.Event.Updated, (evt) =>
|
||||
Effect.gen(function* () {
|
||||
const info = yield* session.get(evt.properties.sessionID)
|
||||
yield* sync(info.id, [{ type: "session", data: info }])
|
||||
}),
|
||||
)
|
||||
yield* watch(MessageV2.Event.Updated, (evt) =>
|
||||
Effect.gen(function* () {
|
||||
const info = evt.properties.info
|
||||
yield* sync(info.sessionID, [{ type: "message", data: info }])
|
||||
if (info.role !== "user") return
|
||||
const model = yield* provider.getModel(info.model.providerID, info.model.modelID)
|
||||
yield* sync(info.sessionID, [{ type: "model", data: [model] }])
|
||||
}),
|
||||
)
|
||||
yield* watch(MessageV2.Event.PartUpdated, (evt) =>
|
||||
sync(evt.properties.part.sessionID, [{ type: "part", data: evt.properties.part }]),
|
||||
)
|
||||
yield* watch(Session.Event.Diff, (evt) =>
|
||||
sync(evt.properties.sessionID, [{ type: "session_diff", data: evt.properties.diff }]),
|
||||
)
|
||||
yield* watch(Session.Event.Deleted, (evt) => remove(evt.properties.sessionID))
|
||||
|
||||
return cache
|
||||
}),
|
||||
)
|
||||
|
||||
const request = Effect.fn("ShareNext.request")(function* () {
|
||||
const headers: Record<string, string> = {}
|
||||
const active = yield* account.active()
|
||||
if (Option.isNone(active) || !active.value.active_org_id) {
|
||||
const baseUrl = (yield* cfg.get()).enterprise?.url ?? "https://opncd.ai"
|
||||
return { headers, api: legacyApi, baseUrl } satisfies Req
|
||||
return
|
||||
}
|
||||
|
||||
const token = yield* account.token(active.value.id)
|
||||
if (Option.isNone(token)) {
|
||||
throw new Error("No active account token available for sharing")
|
||||
}
|
||||
|
||||
headers.authorization = `Bearer ${token.value}`
|
||||
headers["x-org-id"] = active.value.active_org_id
|
||||
return { headers, api: consoleApi, baseUrl: active.value.url } satisfies Req
|
||||
})
|
||||
|
||||
const get = Effect.fnUntraced(function* (sessionID: SessionID) {
|
||||
const row = yield* db((db) =>
|
||||
db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).get(),
|
||||
)
|
||||
if (!row) return
|
||||
return { id: row.id, secret: row.secret, url: row.url } satisfies Share
|
||||
})
|
||||
|
||||
const flush = Effect.fn("ShareNext.flush")(function* (sessionID: SessionID) {
|
||||
if (disabled) return
|
||||
const s = yield* InstanceState.get(state)
|
||||
const queued = s.queue.get(sessionID)
|
||||
if (!queued) return
|
||||
|
||||
s.queue.delete(sessionID)
|
||||
|
||||
const share = yield* get(sessionID)
|
||||
if (!share) return
|
||||
|
||||
const req = yield* request()
|
||||
const res = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.sync(share.id)}`).pipe(
|
||||
HttpClientRequest.setHeaders(req.headers),
|
||||
HttpClientRequest.bodyJson({ secret: share.secret, data: Array.from(queued.data.values()) }),
|
||||
Effect.flatMap((r) => http.execute(r)),
|
||||
)
|
||||
|
||||
if (res.status >= 400) {
|
||||
log.warn("failed to sync share", { sessionID, shareID: share.id, status: res.status })
|
||||
}
|
||||
})
|
||||
|
||||
const full = Effect.fn("ShareNext.full")(function* (sessionID: SessionID) {
|
||||
log.info("full sync", { sessionID })
|
||||
const info = yield* session.get(sessionID)
|
||||
const diffs = yield* session.diff(sessionID)
|
||||
const messages = yield* Effect.sync(() => Array.from(MessageV2.stream(sessionID)))
|
||||
const models = yield* Effect.forEach(
|
||||
Array.from(
|
||||
new Map(
|
||||
messages
|
||||
.filter((msg) => msg.info.role === "user")
|
||||
.map((msg) => (msg.info as SDK.UserMessage).model)
|
||||
.map((item) => [`${item.providerID}/${item.modelID}`, item] as const),
|
||||
).values(),
|
||||
),
|
||||
(item) => provider.getModel(ProviderID.make(item.providerID), ModelID.make(item.modelID)),
|
||||
{ concurrency: 8 },
|
||||
)
|
||||
|
||||
yield* sync(sessionID, [
|
||||
{ type: "session", data: info },
|
||||
...messages.map((item) => ({ type: "message" as const, data: item.info })),
|
||||
...messages.flatMap((item) => item.parts.map((part) => ({ type: "part" as const, data: part }))),
|
||||
{ type: "session_diff", data: diffs },
|
||||
{ type: "model", data: models },
|
||||
])
|
||||
})
|
||||
|
||||
const init = Effect.fn("ShareNext.init")(function* () {
|
||||
if (disabled) return
|
||||
yield* InstanceState.get(state)
|
||||
})
|
||||
|
||||
const url = Effect.fn("ShareNext.url")(function* () {
|
||||
return (yield* request()).baseUrl
|
||||
})
|
||||
|
||||
const create = Effect.fn("ShareNext.create")(function* (sessionID: SessionID) {
|
||||
if (disabled) return { id: "", url: "", secret: "" }
|
||||
log.info("creating share", { sessionID })
|
||||
const req = yield* request()
|
||||
const result = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.create}`).pipe(
|
||||
HttpClientRequest.setHeaders(req.headers),
|
||||
HttpClientRequest.bodyJson({ sessionID }),
|
||||
Effect.flatMap((r) => httpOk.execute(r)),
|
||||
Effect.flatMap(HttpClientResponse.schemaBodyJson(ShareSchema)),
|
||||
)
|
||||
yield* db((db) =>
|
||||
db
|
||||
.insert(SessionShareTable)
|
||||
.values({ session_id: sessionID, id: result.id, secret: result.secret, url: result.url })
|
||||
.onConflictDoUpdate({
|
||||
target: SessionShareTable.session_id,
|
||||
set: { id: result.id, secret: result.secret, url: result.url },
|
||||
})
|
||||
.run(),
|
||||
)
|
||||
const s = yield* InstanceState.get(state)
|
||||
yield* full(sessionID).pipe(
|
||||
const next = new Map(data.map((item) => [key(item), item]))
|
||||
s.queue.set(sessionID, { data: next })
|
||||
yield* flush(sessionID).pipe(
|
||||
Effect.delay(1000),
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.sync(() => {
|
||||
log.error("share full sync failed", { sessionID, cause })
|
||||
log.error("share flush failed", { sessionID, cause })
|
||||
}),
|
||||
),
|
||||
Effect.forkIn(s.scope),
|
||||
)
|
||||
return result
|
||||
})
|
||||
}
|
||||
|
||||
const remove = Effect.fn("ShareNext.remove")(function* (sessionID: SessionID) {
|
||||
if (disabled) return
|
||||
log.info("removing share", { sessionID })
|
||||
const share = yield* get(sessionID)
|
||||
if (!share) return
|
||||
const state: InstanceState.InstanceState<State> = yield* InstanceState.make<State>(
|
||||
Effect.fn("ShareNext.state")(function* (_ctx) {
|
||||
const cache: State = { queue: new Map(), scope: yield* Scope.make() }
|
||||
|
||||
const req = yield* request()
|
||||
yield* HttpClientRequest.delete(`${req.baseUrl}${req.api.remove(share.id)}`).pipe(
|
||||
HttpClientRequest.setHeaders(req.headers),
|
||||
HttpClientRequest.bodyJson({ secret: share.secret }),
|
||||
Effect.flatMap((r) => httpOk.execute(r)),
|
||||
yield* Effect.addFinalizer(() =>
|
||||
Scope.close(cache.scope, Exit.void).pipe(
|
||||
Effect.andThen(
|
||||
Effect.sync(() => {
|
||||
cache.queue.clear()
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
yield* db((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run())
|
||||
})
|
||||
if (disabled) return cache
|
||||
|
||||
return Service.of({ init, url, request, create, remove })
|
||||
}),
|
||||
)
|
||||
const watch = <D extends { type: string }>(
|
||||
def: D,
|
||||
fn: (evt: { properties: any }) => Effect.Effect<void, unknown>,
|
||||
) =>
|
||||
bus.subscribe(def as never).pipe(
|
||||
Stream.runForEach((evt) =>
|
||||
fn(evt).pipe(
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.sync(() => {
|
||||
log.error("share subscriber failed", { type: def.type, cause })
|
||||
}),
|
||||
),
|
||||
),
|
||||
),
|
||||
Effect.forkScoped,
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(
|
||||
Layer.provide(Bus.layer),
|
||||
Layer.provide(Account.defaultLayer),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
Layer.provide(FetchHttpClient.layer),
|
||||
Layer.provide(Provider.defaultLayer),
|
||||
Layer.provide(Session.defaultLayer),
|
||||
)
|
||||
}
|
||||
yield* watch(Session.Event.Updated, (evt) =>
|
||||
Effect.gen(function* () {
|
||||
const info = yield* session.get(evt.properties.sessionID)
|
||||
yield* sync(info.id, [{ type: "session", data: info }])
|
||||
}),
|
||||
)
|
||||
yield* watch(MessageV2.Event.Updated, (evt) =>
|
||||
Effect.gen(function* () {
|
||||
const info = evt.properties.info
|
||||
yield* sync(info.sessionID, [{ type: "message", data: info }])
|
||||
if (info.role !== "user") return
|
||||
const model = yield* provider.getModel(info.model.providerID, info.model.modelID)
|
||||
yield* sync(info.sessionID, [{ type: "model", data: [model] }])
|
||||
}),
|
||||
)
|
||||
yield* watch(MessageV2.Event.PartUpdated, (evt) =>
|
||||
sync(evt.properties.part.sessionID, [{ type: "part", data: evt.properties.part }]),
|
||||
)
|
||||
yield* watch(Session.Event.Diff, (evt) =>
|
||||
sync(evt.properties.sessionID, [{ type: "session_diff", data: evt.properties.diff }]),
|
||||
)
|
||||
yield* watch(Session.Event.Deleted, (evt) => remove(evt.properties.sessionID))
|
||||
|
||||
return cache
|
||||
}),
|
||||
)
|
||||
|
||||
const request = Effect.fn("ShareNext.request")(function* () {
|
||||
const headers: Record<string, string> = {}
|
||||
const active = yield* account.active()
|
||||
if (Option.isNone(active) || !active.value.active_org_id) {
|
||||
const baseUrl = (yield* cfg.get()).enterprise?.url ?? "https://opncd.ai"
|
||||
return { headers, api: legacyApi, baseUrl } satisfies Req
|
||||
}
|
||||
|
||||
const token = yield* account.token(active.value.id)
|
||||
if (Option.isNone(token)) {
|
||||
throw new Error("No active account token available for sharing")
|
||||
}
|
||||
|
||||
headers.authorization = `Bearer ${token.value}`
|
||||
headers["x-org-id"] = active.value.active_org_id
|
||||
return { headers, api: consoleApi, baseUrl: active.value.url } satisfies Req
|
||||
})
|
||||
|
||||
const get = Effect.fnUntraced(function* (sessionID: SessionID) {
|
||||
const row = yield* db((db) =>
|
||||
db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).get(),
|
||||
)
|
||||
if (!row) return
|
||||
return { id: row.id, secret: row.secret, url: row.url } satisfies Share
|
||||
})
|
||||
|
||||
const flush = Effect.fn("ShareNext.flush")(function* (sessionID: SessionID) {
|
||||
if (disabled) return
|
||||
const s = yield* InstanceState.get(state)
|
||||
const queued = s.queue.get(sessionID)
|
||||
if (!queued) return
|
||||
|
||||
s.queue.delete(sessionID)
|
||||
|
||||
const share = yield* get(sessionID)
|
||||
if (!share) return
|
||||
|
||||
const req = yield* request()
|
||||
const res = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.sync(share.id)}`).pipe(
|
||||
HttpClientRequest.setHeaders(req.headers),
|
||||
HttpClientRequest.bodyJson({ secret: share.secret, data: Array.from(queued.data.values()) }),
|
||||
Effect.flatMap((r) => http.execute(r)),
|
||||
)
|
||||
|
||||
if (res.status >= 400) {
|
||||
log.warn("failed to sync share", { sessionID, shareID: share.id, status: res.status })
|
||||
}
|
||||
})
|
||||
|
||||
const full = Effect.fn("ShareNext.full")(function* (sessionID: SessionID) {
|
||||
log.info("full sync", { sessionID })
|
||||
const info = yield* session.get(sessionID)
|
||||
const diffs = yield* session.diff(sessionID)
|
||||
const messages = yield* Effect.sync(() => Array.from(MessageV2.stream(sessionID)))
|
||||
const models = yield* Effect.forEach(
|
||||
Array.from(
|
||||
new Map(
|
||||
messages
|
||||
.filter((msg) => msg.info.role === "user")
|
||||
.map((msg) => (msg.info as SDK.UserMessage).model)
|
||||
.map((item) => [`${item.providerID}/${item.modelID}`, item] as const),
|
||||
).values(),
|
||||
),
|
||||
(item) => provider.getModel(ProviderID.make(item.providerID), ModelID.make(item.modelID)),
|
||||
{ concurrency: 8 },
|
||||
)
|
||||
|
||||
yield* sync(sessionID, [
|
||||
{ type: "session", data: info },
|
||||
...messages.map((item) => ({ type: "message" as const, data: item.info })),
|
||||
...messages.flatMap((item) => item.parts.map((part) => ({ type: "part" as const, data: part }))),
|
||||
{ type: "session_diff", data: diffs },
|
||||
{ type: "model", data: models },
|
||||
])
|
||||
})
|
||||
|
||||
const init = Effect.fn("ShareNext.init")(function* () {
|
||||
if (disabled) return
|
||||
yield* InstanceState.get(state)
|
||||
})
|
||||
|
||||
const url = Effect.fn("ShareNext.url")(function* () {
|
||||
return (yield* request()).baseUrl
|
||||
})
|
||||
|
||||
const create = Effect.fn("ShareNext.create")(function* (sessionID: SessionID) {
|
||||
if (disabled) return { id: "", url: "", secret: "" }
|
||||
log.info("creating share", { sessionID })
|
||||
const req = yield* request()
|
||||
const result = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.create}`).pipe(
|
||||
HttpClientRequest.setHeaders(req.headers),
|
||||
HttpClientRequest.bodyJson({ sessionID }),
|
||||
Effect.flatMap((r) => httpOk.execute(r)),
|
||||
Effect.flatMap(HttpClientResponse.schemaBodyJson(ShareSchema)),
|
||||
)
|
||||
yield* db((db) =>
|
||||
db
|
||||
.insert(SessionShareTable)
|
||||
.values({ session_id: sessionID, id: result.id, secret: result.secret, url: result.url })
|
||||
.onConflictDoUpdate({
|
||||
target: SessionShareTable.session_id,
|
||||
set: { id: result.id, secret: result.secret, url: result.url },
|
||||
})
|
||||
.run(),
|
||||
)
|
||||
const s = yield* InstanceState.get(state)
|
||||
yield* full(sessionID).pipe(
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.sync(() => {
|
||||
log.error("share full sync failed", { sessionID, cause })
|
||||
}),
|
||||
),
|
||||
Effect.forkIn(s.scope),
|
||||
)
|
||||
return result
|
||||
})
|
||||
|
||||
const remove = Effect.fn("ShareNext.remove")(function* (sessionID: SessionID) {
|
||||
if (disabled) return
|
||||
log.info("removing share", { sessionID })
|
||||
const share = yield* get(sessionID)
|
||||
if (!share) return
|
||||
|
||||
const req = yield* request()
|
||||
yield* HttpClientRequest.delete(`${req.baseUrl}${req.api.remove(share.id)}`).pipe(
|
||||
HttpClientRequest.setHeaders(req.headers),
|
||||
HttpClientRequest.bodyJson({ secret: share.secret }),
|
||||
Effect.flatMap((r) => httpOk.execute(r)),
|
||||
)
|
||||
|
||||
yield* db((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run())
|
||||
})
|
||||
|
||||
return Service.of({ init, url, request, create, remove })
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(
|
||||
Layer.provide(Bus.layer),
|
||||
Layer.provide(Account.defaultLayer),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
Layer.provide(FetchHttpClient.layer),
|
||||
Layer.provide(Provider.defaultLayer),
|
||||
Layer.provide(Session.defaultLayer),
|
||||
)
|
||||
|
||||
@@ -12,7 +12,7 @@ import { Config } from "../../src/config"
|
||||
import { Provider } from "../../src/provider"
|
||||
import { Session } from "../../src/session"
|
||||
import type { SessionID } from "../../src/session/schema"
|
||||
import { ShareNext } from "../../src/share/share-next"
|
||||
import { ShareNext } from "../../src/share"
|
||||
import { SessionShareTable } from "../../src/share/share.sql"
|
||||
import { Database, eq } from "../../src/storage/db"
|
||||
import { provideTmpdirInstance } from "../fixture/fixture"
|
||||
|
||||
Reference in New Issue
Block a user