Add Effect-native core event system (#27415)

This commit is contained in:
Dax
2026-05-14 20:50:23 -04:00
committed by GitHub
parent 73cdba959b
commit e11e089e42
43 changed files with 1500 additions and 517 deletions

View File

@@ -5,7 +5,8 @@ import { produce, type Draft } from "immer"
import { ModelV2 } from "./model"
import { PluginV2 } from "./plugin"
import { ProviderV2 } from "./provider"
import { Instance } from "./instance"
import { Location } from "./location"
import { EventV2 } from "./event"
type ProviderRecord = {
provider: ProviderV2.Info
@@ -24,6 +25,15 @@ export class ModelNotFoundError extends Schema.TaggedErrorClass<ModelNotFoundErr
modelID: ModelV2.ID,
}) {}
export const Event = {
ModelUpdated: EventV2.define({
type: "catalog.model.updated",
schema: {
model: ModelV2.Info,
},
}),
}
export interface Interface {
readonly provider: {
readonly get: (providerID: ProviderV2.ID) => Effect.Effect<ProviderV2.Info, ProviderNotFoundError>
@@ -57,10 +67,11 @@ export class Service extends Context.Service<Service, Interface>()("@opencode/v2
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
yield* Instance.Service
yield* Location.Service
let records = HashMap.empty<ProviderV2.ID, ProviderRecord>()
let defaultModel: { providerID: ProviderV2.ID; modelID: ModelV2.ID } | undefined
const plugin = yield* PluginV2.Service
const events = yield* EventV2.Service
const resolve = (model: ModelV2.Info) => {
const provider = Option.getOrThrow(HashMap.get(records, model.providerID)).provider
@@ -157,14 +168,12 @@ export const layer = Layer.effect(
)
const updated = yield* plugin.trigger("model.update", {}, { model, cancel: false })
if (updated.cancel) return
const next = new ModelV2.Info({ ...updated.model, id: modelID, providerID })
records = HashMap.set(records, providerID, {
provider: record.provider,
models: HashMap.set(
record.models,
modelID,
new ModelV2.Info({ ...updated.model, id: modelID, providerID }),
),
models: HashMap.set(record.models, modelID, next),
})
yield* events.publish(Event.ModelUpdated, { model: resolve(next) })
return
}),
@@ -257,4 +266,4 @@ export const layer = Layer.effect(
const SMALL_MODEL_RE = /\b(nano|flash|lite|mini|haiku|small|fast)\b/
export const defaultLayer = layer.pipe(Layer.provide(PluginV2.defaultLayer))
export const defaultLayer = layer.pipe(Layer.provideMerge(EventV2.defaultLayer), Layer.provide(PluginV2.defaultLayer))

157
packages/core/src/event.ts Normal file
View File

@@ -0,0 +1,157 @@
import { Context, Effect, Layer, Option, PubSub, Schema, Stream } from "effect"
import { Location } from "./location"
import { withStatics } from "./schema"
import { Identifier } from "./util/identifier"
export const ID = Schema.String.pipe(
Schema.brand("Event.ID"),
withStatics((schema) => ({ create: () => schema.make("evt_" + Identifier.ascending()) })),
)
export type ID = typeof ID.Type
export type Definition<Type extends string = string, DataSchema extends Schema.Top = Schema.Top> = {
readonly type: Type
readonly version?: number
readonly aggregate?: string
readonly data: DataSchema
}
export type Data<D extends Definition> = Schema.Schema.Type<D["data"]>
export type Payload<D extends Definition = Definition> = {
readonly id: ID
readonly type: D["type"]
readonly data: Data<D>
readonly version?: number
readonly location?: Location.Ref
readonly metadata?: Record<string, unknown>
}
export type Sync = (event: Payload) => Effect.Effect<void>
export const registry = new Map<string, Definition>()
export function define<const Type extends string, Fields extends Schema.Struct.Fields>(input: {
readonly type: Type
readonly version?: number
readonly aggregate?: string
readonly schema: Fields
}): Schema.Schema<Payload<Definition<Type, Schema.Struct<Fields>>>> & Definition<Type, Schema.Struct<Fields>> {
const Data = Schema.Struct(input.schema)
const Payload = Schema.Struct({
id: ID,
metadata: Schema.optional(Schema.Record(Schema.String, Schema.Unknown)),
type: Schema.Literal(input.type),
version: Schema.optional(Schema.Number),
location: Schema.optional(Location.Ref),
data: Data,
}).annotate({ identifier: input.type })
const definition = Object.assign(Payload, {
type: input.type,
...(input.version === undefined ? {} : { version: input.version }),
...(input.aggregate === undefined ? {} : { aggregate: input.aggregate }),
data: Data,
})
registry.set(input.type, definition)
return definition as Schema.Schema<Payload<Definition<Type, Schema.Struct<Fields>>>> &
Definition<Type, Schema.Struct<Fields>>
}
export function definitions() {
return registry.values().toArray()
}
export interface PublishOptions {
readonly id?: ID
readonly metadata?: Record<string, unknown>
}
export type Unsubscribe = Effect.Effect<void>
export interface Interface {
readonly publish: <D extends Definition>(
definition: D,
data: Data<D>,
options?: PublishOptions,
) => Effect.Effect<Payload<D>>
readonly publishEvent: <D extends Definition>(event: Payload<D>) => Effect.Effect<Payload<D>>
readonly subscribe: <D extends Definition>(definition: D) => Stream.Stream<Payload<D>>
readonly all: () => Stream.Stream<Payload>
readonly sync: (handler: Sync) => Effect.Effect<Unsubscribe>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/Event") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const all = yield* PubSub.unbounded<Payload>()
const typed = new Map<string, PubSub.PubSub<Payload>>()
const syncHandlers = new Array<Sync>()
const getOrCreate = (definition: Definition) =>
Effect.gen(function* () {
const existing = typed.get(definition.type)
if (existing) return existing
const pubsub = yield* PubSub.unbounded<Payload>()
typed.set(definition.type, pubsub)
return pubsub
})
yield* Effect.addFinalizer(() =>
Effect.gen(function* () {
yield* PubSub.shutdown(all)
yield* Effect.forEach(typed.values(), PubSub.shutdown, { discard: true })
}),
)
function publishEvent<D extends Definition>(event: Payload<D>) {
return Effect.gen(function* () {
for (const sync of syncHandlers) {
yield* sync(event as Payload)
}
const pubsub = typed.get(event.type)
if (pubsub) yield* PubSub.publish(pubsub, event as Payload)
yield* PubSub.publish(all, event as Payload)
return event
})
}
function publish<D extends Definition>(definition: D, data: Data<D>, options?: PublishOptions) {
return Effect.gen(function* () {
const location = Option.getOrUndefined(yield* Effect.serviceOption(Location.Service))
const event = {
id: options?.id ?? ID.create(),
...(options?.metadata ? { metadata: options.metadata } : {}),
type: definition.type,
...(definition.version === undefined ? {} : { version: definition.version }),
...(location ? { location } : {}),
data,
} as Payload<D>
return yield* publishEvent(event)
})
}
const subscribe = <D extends Definition>(definition: D): Stream.Stream<Payload<D>> =>
Stream.unwrap(getOrCreate(definition).pipe(Effect.map((pubsub) => Stream.fromPubSub(pubsub)))).pipe(
Stream.map((event) => event as Payload<D>),
)
const streamAll = (): Stream.Stream<Payload> => Stream.fromPubSub(all)
const sync = (handler: Sync): Effect.Effect<Unsubscribe> =>
Effect.sync(() => {
syncHandlers.push(handler)
return Effect.sync(() => {
const index = syncHandlers.indexOf(handler)
if (index >= 0) syncHandlers.splice(index, 1)
})
})
return Service.of({ publish, publishEvent, subscribe, all: streamAll, sync })
}),
)
export const defaultLayer = layer
export * as EventV2 from "./event"

View File

@@ -1,12 +0,0 @@
import { Layer, LayerMap } from "effect"
import { Instance } from "./instance"
import { Catalog } from "./catalog"
import { PluginBoot } from "./plugin/boot"
export class InstanceServiceMap extends LayerMap.Service<InstanceServiceMap>()("@opencode/example/InstanceServiceMap", {
lookup: (ref: Instance.Ref) => {
const instance = Layer.succeed(Instance.Service, Instance.Service.of(ref))
return Layer.mergeAll(Catalog.defaultLayer, PluginBoot.defaultLayer).pipe(Layer.provide(instance))
},
idleTimeToLive: "5 minutes",
}) {}

View File

@@ -1,10 +0,0 @@
import { Context } from "effect"
export * as Instance from "./instance"
export type Ref = {
readonly directory: string
readonly workspaceID?: string
}
export class Service extends Context.Service<Service, Ref>()("@opencode/Instance") {}

View File

@@ -0,0 +1,12 @@
import { Layer, LayerMap } from "effect"
import { Location } from "./location"
import { Catalog } from "./catalog"
import { PluginBoot } from "./plugin/boot"
export class LocationServiceMap extends LayerMap.Service<LocationServiceMap>()("@opencode/example/LocationServiceMap", {
lookup: (ref: Location.Ref) => {
const location = Layer.succeed(Location.Service, Location.Service.of(ref))
return Layer.mergeAll(Catalog.defaultLayer, PluginBoot.defaultLayer).pipe(Layer.provide(location))
},
idleTimeToLive: "5 minutes",
}) {}

View File

@@ -0,0 +1,11 @@
import { Context, Schema } from "effect"
export * as Location from "./location"
export const Ref = Schema.Struct({
directory: Schema.String,
workspaceID: Schema.optional(Schema.String),
}).annotate({ identifier: "Location.Ref" })
export type Ref = typeof Ref.Type
export class Service extends Context.Service<Service, Ref>()("@opencode/Location") {}

View File

@@ -0,0 +1,402 @@
import { Schema } from "effect"
import { EventV2 } from "./event"
import { ModelV2 } from "./model"
import { NonNegativeInt } from "./schema"
import { Session } from "./session"
import { FileAttachment, Prompt } from "./session-prompt"
import { ToolOutput } from "./tool-output"
import { V2Schema } from "./v2-schema"
export { FileAttachment }
export const Source = Schema.Struct({
start: NonNegativeInt,
end: NonNegativeInt,
text: Schema.String,
}).annotate({
identifier: "session.next.event.source",
})
export type Source = typeof Source.Type
const Base = {
timestamp: V2Schema.DateTimeUtcFromMillis,
sessionID: Session.ID,
}
const options = {
aggregate: "sessionID",
version: 1,
} as const
export const UnknownError = Schema.Struct({
type: Schema.Literal("unknown"),
message: Schema.String,
}).annotate({
identifier: "Session.Error.Unknown",
})
export type UnknownError = typeof UnknownError.Type
export const AgentSwitched = EventV2.define({
type: "session.next.agent.switched",
...options,
schema: {
...Base,
agent: Schema.String,
},
})
export type AgentSwitched = typeof AgentSwitched.Type
export const ModelSwitched = EventV2.define({
type: "session.next.model.switched",
...options,
schema: {
...Base,
model: ModelV2.Ref,
},
})
export type ModelSwitched = typeof ModelSwitched.Type
export const Prompted = EventV2.define({
type: "session.next.prompted",
...options,
schema: {
...Base,
prompt: Prompt,
},
})
export type Prompted = typeof Prompted.Type
export const Synthetic = EventV2.define({
type: "session.next.synthetic",
...options,
schema: {
...Base,
text: Schema.String,
},
})
export type Synthetic = typeof Synthetic.Type
export namespace Shell {
export const Started = EventV2.define({
type: "session.next.shell.started",
...options,
schema: {
...Base,
callID: Schema.String,
command: Schema.String,
},
})
export type Started = typeof Started.Type
export const Ended = EventV2.define({
type: "session.next.shell.ended",
...options,
schema: {
...Base,
callID: Schema.String,
output: Schema.String,
},
})
export type Ended = typeof Ended.Type
}
export namespace Step {
export const Started = EventV2.define({
type: "session.next.step.started",
...options,
schema: {
...Base,
agent: Schema.String,
model: ModelV2.Ref,
snapshot: Schema.String.pipe(Schema.optional),
},
})
export type Started = typeof Started.Type
export const Ended = EventV2.define({
type: "session.next.step.ended",
...options,
schema: {
...Base,
finish: Schema.String,
cost: Schema.Finite,
tokens: Schema.Struct({
input: Schema.Finite,
output: Schema.Finite,
reasoning: Schema.Finite,
cache: Schema.Struct({
read: Schema.Finite,
write: Schema.Finite,
}),
}),
snapshot: Schema.String.pipe(Schema.optional),
},
})
export type Ended = typeof Ended.Type
export const Failed = EventV2.define({
type: "session.next.step.failed",
...options,
schema: {
...Base,
error: UnknownError,
},
})
export type Failed = typeof Failed.Type
}
export namespace Text {
export const Started = EventV2.define({
type: "session.next.text.started",
...options,
schema: {
...Base,
},
})
export type Started = typeof Started.Type
export const Delta = EventV2.define({
type: "session.next.text.delta",
...options,
schema: {
...Base,
delta: Schema.String,
},
})
export type Delta = typeof Delta.Type
export const Ended = EventV2.define({
type: "session.next.text.ended",
...options,
schema: {
...Base,
text: Schema.String,
},
})
export type Ended = typeof Ended.Type
}
export namespace Reasoning {
export const Started = EventV2.define({
type: "session.next.reasoning.started",
...options,
schema: {
...Base,
reasoningID: Schema.String,
},
})
export type Started = typeof Started.Type
export const Delta = EventV2.define({
type: "session.next.reasoning.delta",
...options,
schema: {
...Base,
reasoningID: Schema.String,
delta: Schema.String,
},
})
export type Delta = typeof Delta.Type
export const Ended = EventV2.define({
type: "session.next.reasoning.ended",
...options,
schema: {
...Base,
reasoningID: Schema.String,
text: Schema.String,
},
})
export type Ended = typeof Ended.Type
}
export namespace Tool {
export namespace Input {
export const Started = EventV2.define({
type: "session.next.tool.input.started",
...options,
schema: {
...Base,
callID: Schema.String,
name: Schema.String,
},
})
export type Started = typeof Started.Type
export const Delta = EventV2.define({
type: "session.next.tool.input.delta",
...options,
schema: {
...Base,
callID: Schema.String,
delta: Schema.String,
},
})
export type Delta = typeof Delta.Type
export const Ended = EventV2.define({
type: "session.next.tool.input.ended",
...options,
schema: {
...Base,
callID: Schema.String,
text: Schema.String,
},
})
export type Ended = typeof Ended.Type
}
export const Called = EventV2.define({
type: "session.next.tool.called",
...options,
schema: {
...Base,
callID: Schema.String,
tool: Schema.String,
input: Schema.Record(Schema.String, Schema.Unknown),
provider: Schema.Struct({
executed: Schema.Boolean,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}),
},
})
export type Called = typeof Called.Type
export const Progress = EventV2.define({
type: "session.next.tool.progress",
...options,
schema: {
...Base,
callID: Schema.String,
structured: ToolOutput.Structured,
content: Schema.Array(ToolOutput.Content),
},
})
export type Progress = typeof Progress.Type
export const Success = EventV2.define({
type: "session.next.tool.success",
...options,
schema: {
...Base,
callID: Schema.String,
structured: ToolOutput.Structured,
content: Schema.Array(ToolOutput.Content),
provider: Schema.Struct({
executed: Schema.Boolean,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}),
},
})
export type Success = typeof Success.Type
export const Failed = EventV2.define({
type: "session.next.tool.failed",
...options,
schema: {
...Base,
callID: Schema.String,
error: UnknownError,
provider: Schema.Struct({
executed: Schema.Boolean,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}),
},
})
export type Failed = typeof Failed.Type
}
export const RetryError = Schema.Struct({
message: Schema.String,
statusCode: Schema.Finite.pipe(Schema.optional),
isRetryable: Schema.Boolean,
responseHeaders: Schema.Record(Schema.String, Schema.String).pipe(Schema.optional),
responseBody: Schema.String.pipe(Schema.optional),
metadata: Schema.Record(Schema.String, Schema.String).pipe(Schema.optional),
}).annotate({
identifier: "session.next.retry_error",
})
export type RetryError = typeof RetryError.Type
export const Retried = EventV2.define({
type: "session.next.retried",
...options,
schema: {
...Base,
attempt: Schema.Finite,
error: RetryError,
},
})
export type Retried = typeof Retried.Type
export namespace Compaction {
export const Started = EventV2.define({
type: "session.next.compaction.started",
...options,
schema: {
...Base,
reason: Schema.Union([Schema.Literal("auto"), Schema.Literal("manual")]),
},
})
export type Started = typeof Started.Type
export const Delta = EventV2.define({
type: "session.next.compaction.delta",
...options,
schema: {
...Base,
text: Schema.String,
},
})
export type Delta = typeof Delta.Type
export const Ended = EventV2.define({
type: "session.next.compaction.ended",
...options,
schema: {
...Base,
text: Schema.String,
include: Schema.String.pipe(Schema.optional),
},
})
export type Ended = typeof Ended.Type
}
export const All = Schema.Union(
[
AgentSwitched,
ModelSwitched,
Prompted,
Synthetic,
Shell.Started,
Shell.Ended,
Step.Started,
Step.Ended,
Step.Failed,
Text.Started,
Text.Delta,
Text.Ended,
Tool.Input.Started,
Tool.Input.Delta,
Tool.Input.Ended,
Tool.Called,
Tool.Progress,
Tool.Success,
Tool.Failed,
Reasoning.Started,
Reasoning.Delta,
Reasoning.Ended,
Retried,
Compaction.Started,
Compaction.Delta,
Compaction.Ended,
],
{
mode: "oneOf",
},
).pipe(Schema.toTaggedUnion("type"))
export type Event = typeof All.Type
export type Type = Event["type"]
export * as SessionEvent from "./session-event"

View File

@@ -0,0 +1,417 @@
import { produce, type WritableDraft } from "immer"
import { SessionEvent } from "./session-event"
import { SessionMessage } from "./session-message"
export type MemoryState = {
messages: SessionMessage.Message[]
}
export interface Adapter<Result> {
readonly getCurrentAssistant: () => SessionMessage.Assistant | undefined
readonly getCurrentCompaction: () => SessionMessage.Compaction | undefined
readonly getCurrentShell: (callID: string) => SessionMessage.Shell | undefined
readonly updateAssistant: (assistant: SessionMessage.Assistant) => void
readonly updateCompaction: (compaction: SessionMessage.Compaction) => void
readonly updateShell: (shell: SessionMessage.Shell) => void
readonly appendMessage: (message: SessionMessage.Message) => void
readonly finish: () => Result
}
export function memory(state: MemoryState): Adapter<MemoryState> {
const activeAssistantIndex = () =>
state.messages.findLastIndex((message) => message.type === "assistant" && !message.time.completed)
const activeCompactionIndex = () => state.messages.findLastIndex((message) => message.type === "compaction")
const activeShellIndex = (callID: string) =>
state.messages.findLastIndex((message) => message.type === "shell" && message.callID === callID)
return {
getCurrentAssistant() {
const index = activeAssistantIndex()
if (index < 0) return
const assistant = state.messages[index]
return assistant?.type === "assistant" ? assistant : undefined
},
getCurrentCompaction() {
const index = activeCompactionIndex()
if (index < 0) return
const compaction = state.messages[index]
return compaction?.type === "compaction" ? compaction : undefined
},
getCurrentShell(callID) {
const index = activeShellIndex(callID)
if (index < 0) return
const shell = state.messages[index]
return shell?.type === "shell" ? shell : undefined
},
updateAssistant(assistant) {
const index = activeAssistantIndex()
if (index < 0) return
const current = state.messages[index]
if (current?.type !== "assistant") return
state.messages[index] = assistant
},
updateCompaction(compaction) {
const index = activeCompactionIndex()
if (index < 0) return
const current = state.messages[index]
if (current?.type !== "compaction") return
state.messages[index] = compaction
},
updateShell(shell) {
const index = activeShellIndex(shell.callID)
if (index < 0) return
const current = state.messages[index]
if (current?.type !== "shell") return
state.messages[index] = shell
},
appendMessage(message) {
state.messages.push(message)
},
finish() {
return state
},
}
}
export function update<Result>(adapter: Adapter<Result>, event: SessionEvent.Event): Result {
const currentAssistant = adapter.getCurrentAssistant()
type DraftAssistant = WritableDraft<SessionMessage.Assistant>
type DraftTool = WritableDraft<SessionMessage.AssistantTool>
type DraftText = WritableDraft<SessionMessage.AssistantText>
type DraftReasoning = WritableDraft<SessionMessage.AssistantReasoning>
const latestTool = (assistant: DraftAssistant | undefined, callID?: string) =>
assistant?.content.findLast(
(item): item is DraftTool => item.type === "tool" && (callID === undefined || item.id === callID),
)
const latestText = (assistant: DraftAssistant | undefined) =>
assistant?.content.findLast((item): item is DraftText => item.type === "text")
const latestReasoning = (assistant: DraftAssistant | undefined, reasoningID: string) =>
assistant?.content.findLast((item): item is DraftReasoning => item.type === "reasoning" && item.id === reasoningID)
SessionEvent.All.match(event, {
"session.next.agent.switched": (event) => {
adapter.appendMessage(
new SessionMessage.AgentSwitched({
id: event.id,
type: "agent-switched",
metadata: event.metadata,
agent: event.data.agent,
time: { created: event.data.timestamp },
}),
)
},
"session.next.model.switched": (event) => {
adapter.appendMessage(
new SessionMessage.ModelSwitched({
id: event.id,
type: "model-switched",
metadata: event.metadata,
model: event.data.model,
time: { created: event.data.timestamp },
}),
)
},
"session.next.prompted": (event) => {
adapter.appendMessage(
new SessionMessage.User({
id: event.id,
type: "user",
metadata: event.metadata,
text: event.data.prompt.text,
files: event.data.prompt.files,
agents: event.data.prompt.agents,
references: event.data.prompt.references,
time: { created: event.data.timestamp },
}),
)
},
"session.next.synthetic": (event) => {
adapter.appendMessage(
new SessionMessage.Synthetic({
sessionID: event.data.sessionID,
text: event.data.text,
id: event.id,
type: "synthetic",
time: { created: event.data.timestamp },
}),
)
},
"session.next.shell.started": (event) => {
adapter.appendMessage(
new SessionMessage.Shell({
id: event.id,
type: "shell",
metadata: event.metadata,
callID: event.data.callID,
command: event.data.command,
output: "",
time: { created: event.data.timestamp },
}),
)
},
"session.next.shell.ended": (event) => {
const currentShell = adapter.getCurrentShell(event.data.callID)
if (currentShell) {
adapter.updateShell(
produce(currentShell, (draft) => {
draft.output = event.data.output
draft.time.completed = event.data.timestamp
}),
)
}
},
"session.next.step.started": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.time.completed = event.data.timestamp
}),
)
}
adapter.appendMessage(
new SessionMessage.Assistant({
id: event.id,
type: "assistant",
agent: event.data.agent,
model: event.data.model,
time: { created: event.data.timestamp },
content: [],
snapshot: event.data.snapshot ? { start: event.data.snapshot } : undefined,
}),
)
},
"session.next.step.ended": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.time.completed = event.data.timestamp
draft.finish = event.data.finish
draft.cost = event.data.cost
draft.tokens = event.data.tokens
if (event.data.snapshot) draft.snapshot = { ...draft.snapshot, end: event.data.snapshot }
}),
)
}
},
"session.next.step.failed": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.time.completed = event.data.timestamp
draft.finish = "error"
draft.error = event.data.error
}),
)
}
},
"session.next.text.started": () => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.content.push({
type: "text",
text: "",
})
}),
)
}
},
"session.next.text.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestText(draft)
if (match) match.text += event.data.delta
}),
)
}
},
"session.next.text.ended": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestText(draft)
if (match) match.text = event.data.text
}),
)
}
},
"session.next.tool.input.started": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.content.push({
type: "tool",
id: event.data.callID,
name: event.data.name,
time: {
created: event.data.timestamp,
},
state: {
status: "pending",
input: "",
},
})
}),
)
}
},
"session.next.tool.input.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.data.callID)
// oxlint-disable-next-line no-base-to-string -- event.delta is a Schema.String (runtime string)
if (match && match.state.status === "pending") match.state.input += event.data.delta
}),
)
}
},
"session.next.tool.input.ended": () => {},
"session.next.tool.called": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.data.callID)
if (match) {
match.provider = event.data.provider
match.time.ran = event.data.timestamp
match.state = {
status: "running",
input: event.data.input,
structured: {},
content: [],
}
}
}),
)
}
},
"session.next.tool.progress": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.data.callID)
if (match && match.state.status === "running") {
match.state.structured = event.data.structured
match.state.content = [...event.data.content]
}
}),
)
}
},
"session.next.tool.success": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.data.callID)
if (match && match.state.status === "running") {
match.provider = event.data.provider
match.time.completed = event.data.timestamp
match.state = {
status: "completed",
input: match.state.input,
structured: event.data.structured,
content: [...event.data.content],
}
}
}),
)
}
},
"session.next.tool.failed": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.data.callID)
if (match && match.state.status === "running") {
match.provider = event.data.provider
match.time.completed = event.data.timestamp
match.state = {
status: "error",
error: event.data.error,
input: match.state.input,
structured: match.state.structured,
content: match.state.content,
}
}
}),
)
}
},
"session.next.reasoning.started": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.content.push({
type: "reasoning",
id: event.data.reasoningID,
text: "",
})
}),
)
}
},
"session.next.reasoning.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestReasoning(draft, event.data.reasoningID)
if (match) match.text += event.data.delta
}),
)
}
},
"session.next.reasoning.ended": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestReasoning(draft, event.data.reasoningID)
if (match) match.text = event.data.text
}),
)
}
},
"session.next.retried": () => {},
"session.next.compaction.started": (event) => {
adapter.appendMessage(
new SessionMessage.Compaction({
id: event.id,
type: "compaction",
metadata: event.metadata,
reason: event.data.reason,
summary: "",
time: { created: event.data.timestamp },
}),
)
},
"session.next.compaction.delta": (event) => {
const currentCompaction = adapter.getCurrentCompaction()
if (currentCompaction) {
adapter.updateCompaction(
produce(currentCompaction, (draft) => {
draft.summary += event.data.text
}),
)
}
},
"session.next.compaction.ended": (event) => {
const currentCompaction = adapter.getCurrentCompaction()
if (currentCompaction) {
adapter.updateCompaction(
produce(currentCompaction, (draft) => {
draft.summary = event.data.text
draft.include = event.data.include
}),
)
}
},
})
return adapter.finish()
}
export * as SessionMessageUpdater from "./session-message-updater"

View File

@@ -0,0 +1,173 @@
import { Schema } from "effect"
import { Prompt } from "./session-prompt"
import { SessionEvent } from "./session-event"
import { EventV2 } from "./event"
import { ToolOutput } from "./tool-output"
import { V2Schema } from "./v2-schema"
import { ModelV2 } from "./model"
export const ID = EventV2.ID
export type ID = Schema.Schema.Type<typeof ID>
const Base = {
id: ID,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
time: Schema.Struct({
created: V2Schema.DateTimeUtcFromMillis,
}),
}
export class AgentSwitched extends Schema.Class<AgentSwitched>("Session.Message.AgentSwitched")({
...Base,
type: Schema.Literal("agent-switched"),
agent: SessionEvent.AgentSwitched.data.fields.agent,
}) {}
export class ModelSwitched extends Schema.Class<ModelSwitched>("Session.Message.ModelSwitched")({
...Base,
type: Schema.Literal("model-switched"),
model: ModelV2.Ref,
}) {}
export class User extends Schema.Class<User>("Session.Message.User")({
...Base,
text: Prompt.fields.text,
files: Prompt.fields.files,
agents: Prompt.fields.agents,
references: Prompt.fields.references,
type: Schema.Literal("user"),
time: Schema.Struct({
created: V2Schema.DateTimeUtcFromMillis,
}),
}) {}
export class Synthetic extends Schema.Class<Synthetic>("Session.Message.Synthetic")({
...Base,
sessionID: SessionEvent.Synthetic.data.fields.sessionID,
text: SessionEvent.Synthetic.data.fields.text,
type: Schema.Literal("synthetic"),
}) {}
export class Shell extends Schema.Class<Shell>("Session.Message.Shell")({
...Base,
type: Schema.Literal("shell"),
callID: SessionEvent.Shell.Started.data.fields.callID,
command: SessionEvent.Shell.Started.data.fields.command,
output: Schema.String,
time: Schema.Struct({
created: V2Schema.DateTimeUtcFromMillis,
completed: V2Schema.DateTimeUtcFromMillis.pipe(Schema.optional),
}),
}) {}
export class ToolStatePending extends Schema.Class<ToolStatePending>("Session.Message.ToolState.Pending")({
status: Schema.Literal("pending"),
input: Schema.String,
}) {}
export class ToolStateRunning extends Schema.Class<ToolStateRunning>("Session.Message.ToolState.Running")({
status: Schema.Literal("running"),
input: Schema.Record(Schema.String, Schema.Unknown),
structured: ToolOutput.Structured,
content: ToolOutput.Content.pipe(Schema.Array),
}) {}
export class ToolStateCompleted extends Schema.Class<ToolStateCompleted>("Session.Message.ToolState.Completed")({
status: Schema.Literal("completed"),
input: Schema.Record(Schema.String, Schema.Unknown),
attachments: SessionEvent.FileAttachment.pipe(Schema.Array, Schema.optional),
content: ToolOutput.Content.pipe(Schema.Array),
structured: ToolOutput.Structured,
}) {}
export class ToolStateError extends Schema.Class<ToolStateError>("Session.Message.ToolState.Error")({
status: Schema.Literal("error"),
input: Schema.Record(Schema.String, Schema.Unknown),
content: ToolOutput.Content.pipe(Schema.Array),
structured: ToolOutput.Structured,
error: SessionEvent.UnknownError,
}) {}
export const ToolState = Schema.Union([ToolStatePending, ToolStateRunning, ToolStateCompleted, ToolStateError]).pipe(
Schema.toTaggedUnion("status"),
)
export type ToolState = Schema.Schema.Type<typeof ToolState>
export class AssistantTool extends Schema.Class<AssistantTool>("Session.Message.Assistant.Tool")({
type: Schema.Literal("tool"),
id: Schema.String,
name: Schema.String,
provider: Schema.Struct({
executed: Schema.Boolean,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}).pipe(Schema.optional),
state: ToolState,
time: Schema.Struct({
created: V2Schema.DateTimeUtcFromMillis,
ran: V2Schema.DateTimeUtcFromMillis.pipe(Schema.optional),
completed: V2Schema.DateTimeUtcFromMillis.pipe(Schema.optional),
pruned: V2Schema.DateTimeUtcFromMillis.pipe(Schema.optional),
}),
}) {}
export class AssistantText extends Schema.Class<AssistantText>("Session.Message.Assistant.Text")({
type: Schema.Literal("text"),
text: Schema.String,
}) {}
export class AssistantReasoning extends Schema.Class<AssistantReasoning>("Session.Message.Assistant.Reasoning")({
type: Schema.Literal("reasoning"),
id: Schema.String,
text: Schema.String,
}) {}
export const AssistantContent = Schema.Union([AssistantText, AssistantReasoning, AssistantTool]).pipe(
Schema.toTaggedUnion("type"),
)
export type AssistantContent = Schema.Schema.Type<typeof AssistantContent>
export class Assistant extends Schema.Class<Assistant>("Session.Message.Assistant")({
...Base,
type: Schema.Literal("assistant"),
agent: Schema.String,
model: SessionEvent.Step.Started.data.fields.model,
content: AssistantContent.pipe(Schema.Array),
snapshot: Schema.Struct({
start: Schema.String.pipe(Schema.optional),
end: Schema.String.pipe(Schema.optional),
}).pipe(Schema.optional),
finish: Schema.String.pipe(Schema.optional),
cost: Schema.Finite.pipe(Schema.optional),
tokens: Schema.Struct({
input: Schema.Finite,
output: Schema.Finite,
reasoning: Schema.Finite,
cache: Schema.Struct({
read: Schema.Finite,
write: Schema.Finite,
}),
}).pipe(Schema.optional),
error: SessionEvent.Step.Failed.data.fields.error.pipe(Schema.optional),
time: Schema.Struct({
created: V2Schema.DateTimeUtcFromMillis,
completed: V2Schema.DateTimeUtcFromMillis.pipe(Schema.optional),
}),
}) {}
export class Compaction extends Schema.Class<Compaction>("Session.Message.Compaction")({
type: Schema.Literal("compaction"),
reason: SessionEvent.Compaction.Started.data.fields.reason,
summary: Schema.String,
include: Schema.String.pipe(Schema.optional),
...Base,
}) {}
export const Message = Schema.Union([AgentSwitched, ModelSwitched, User, Synthetic, Shell, Assistant, Compaction])
.pipe(Schema.toTaggedUnion("type"))
.annotate({ identifier: "Session.Message" })
export type Message = Schema.Schema.Type<typeof Message>
export type Type = Message["type"]
export * as SessionMessage from "./session-message"

View File

@@ -0,0 +1,13 @@
export * as Session from "./session"
import { Schema } from "effect"
import { withStatics } from "./schema"
import { Identifier } from "./util/identifier"
export const ID = Schema.String.check(Schema.isStartsWith("ses")).pipe(
Schema.brand("SessionID"),
withStatics((schema) => ({
descending: (id?: string) => schema.make(id ?? "ses_" + Identifier.descending()),
})),
)
export type ID = typeof ID.Type

View File

@@ -1,14 +1,21 @@
import { describe, expect } from "bun:test"
import { DateTime, Effect, Layer, Option } from "effect"
import { DateTime, Effect, Fiber, Layer, Option, Stream } from "effect"
import { Catalog } from "@opencode-ai/core/catalog"
import { Instance } from "@opencode-ai/core/instance"
import { EventV2 } from "@opencode-ai/core/event"
import { Location } from "@opencode-ai/core/location"
import { ModelV2 } from "@opencode-ai/core/model"
import { PluginV2 } from "@opencode-ai/core/plugin"
import { ProviderV2 } from "@opencode-ai/core/provider"
import { testEffect } from "./lib/effect"
const instanceLayer = Layer.succeed(Instance.Service, Instance.Service.of({ directory: "test" }))
const it = testEffect(Catalog.layer.pipe(Layer.provideMerge(PluginV2.defaultLayer), Layer.provide(instanceLayer)))
const locationLayer = Layer.succeed(Location.Service, Location.Service.of({ directory: "test" }))
const it = testEffect(
Catalog.layer.pipe(
Layer.provideMerge(EventV2.defaultLayer),
Layer.provideMerge(PluginV2.defaultLayer),
Layer.provideMerge(locationLayer),
),
)
describe("CatalogV2", () => {
it.effect("normalizes provider baseURL into endpoint url", () =>
@@ -69,6 +76,31 @@ describe("CatalogV2", () => {
}),
)
it.effect("publishes model updated events", () =>
Effect.gen(function* () {
const catalog = yield* Catalog.Service
const events = yield* EventV2.Service
const providerID = ProviderV2.ID.make("test")
const modelID = ModelV2.ID.make("model")
const fiber = yield* events
.subscribe(Catalog.Event.ModelUpdated)
.pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
yield* Effect.yieldNow
yield* catalog.provider.update(providerID, () => {})
yield* catalog.model.update(providerID, modelID, (model) => {
model.name = "Updated Model"
})
const event = Array.from(yield* Fiber.join(fiber))[0]
expect(event?.type).toBe("catalog.model.updated")
expect(event?.data.model.providerID).toBe(providerID)
expect(event?.data.model.id).toBe(modelID)
expect(event?.data.model.name).toBe("Updated Model")
expect(event?.location).toEqual({ directory: "test" })
}),
)
it.effect("resolves unknown model endpoint from provider endpoint", () =>
Effect.gen(function* () {
const catalog = yield* Catalog.Service

View File

@@ -0,0 +1,133 @@
import { describe, expect } from "bun:test"
import { Effect, Fiber, Layer, Schema, Stream } from "effect"
import { EventV2 } from "@opencode-ai/core/event"
import { Location } from "@opencode-ai/core/location"
import { testEffect } from "./lib/effect"
const locationLayer = Layer.succeed(
Location.Service,
Location.Service.of({ directory: "project", workspaceID: "workspace" }),
)
const it = testEffect(EventV2.layer.pipe(Layer.provideMerge(locationLayer)))
const itWithoutLocation = testEffect(EventV2.layer)
const Message = EventV2.define({
type: "test.message",
schema: {
text: Schema.String,
},
})
const GlobalMessage = EventV2.define({
type: "test.global",
schema: {
text: Schema.String,
},
})
const VersionedMessage = EventV2.define({
type: "test.versioned",
version: 2,
schema: {
text: Schema.String,
},
})
describe("EventV2", () => {
it.effect("publishes events with the current location", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const fiber = yield* events.subscribe(Message).pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
yield* Effect.yieldNow
const event = yield* events.publish(Message, { text: "hello" })
const received = Array.from(yield* Fiber.join(fiber))
expect(received).toEqual([event])
expect(event.type).toBe("test.message")
expect(event).not.toHaveProperty("version")
expect(event.data).toEqual({ text: "hello" })
expect(event.location).toEqual({ directory: "project", workspaceID: "workspace" })
}),
)
itWithoutLocation.effect("omits location when no location is available", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const event = yield* events.publish(GlobalMessage, { text: "hello" })
expect(event).not.toHaveProperty("location")
expect(event.type).toBe("test.global")
}),
)
it.effect("publishes definition version", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const event = yield* events.publish(VersionedMessage, { text: "hello" })
expect(event.type).toBe("test.versioned")
expect(event.version).toBe(2)
}),
)
it.effect("stores definitions in the exported registry", () =>
Effect.sync(() => {
expect(EventV2.registry.get(Message.type)).toBe(Message)
}),
)
it.effect("publishes to typed and wildcard subscriptions", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const typed = yield* events.subscribe(Message).pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
const wildcard = yield* events.all().pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
yield* Effect.yieldNow
const event = yield* events.publish(Message, { text: "hello" })
expect(Array.from(yield* Fiber.join(typed))).toEqual([event])
expect(Array.from(yield* Fiber.join(wildcard))).toEqual([event])
}),
)
it.effect("runs sync handlers inline", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const received = new Array<EventV2.Payload>()
const unsubscribe = yield* events.sync((event) =>
Effect.sync(() => {
received.push(event)
}),
)
const event = yield* events.publish(Message, { text: "hello" })
yield* unsubscribe
yield* events.publish(Message, { text: "after unsubscribe" })
expect(received).toEqual([event])
}),
)
it.effect("runs sync handlers before publishing to streams", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const received = new Array<string>()
const fiber = yield* events.all().pipe(
Stream.take(1),
Stream.runForEach(() => Effect.sync(() => received.push("stream"))),
Effect.forkScoped,
)
yield* events.sync((event) =>
Effect.sync(() => {
received.push(event.type)
}),
)
yield* Effect.yieldNow
yield* events.publish(Message, { text: "hello" })
yield* Fiber.join(fiber)
expect(received).toEqual([Message.type, "stream"])
}),
)
})

View File

@@ -1,7 +1,7 @@
import { describe, expect } from "bun:test"
import { DateTime, Effect, Layer, Option } from "effect"
import { Catalog } from "@opencode-ai/core/catalog"
import { Instance } from "@opencode-ai/core/instance"
import { Location } from "@opencode-ai/core/location"
import { ModelV2 } from "@opencode-ai/core/model"
import { PluginV2 } from "@opencode-ai/core/plugin"
import { OpencodePlugin } from "@opencode-ai/core/plugin/provider/opencode"
@@ -9,7 +9,7 @@ import { ProviderV2 } from "@opencode-ai/core/provider"
import { it, model, provider, withEnv } from "./provider-helper"
const cost = (input: number, output = 0) => [{ input, output, cache: { read: 0, write: 0 } }]
const instanceLayer = Layer.succeed(Instance.Service, Instance.Service.of({ directory: "test" }))
const locationLayer = Layer.succeed(Location.Service, Location.Service.of({ directory: "test" }))
describe("OpencodePlugin", () => {
it.effect("uses a public key and cancels paid models without credentials", () =>
@@ -192,6 +192,6 @@ describe("OpencodePlugin", () => {
const selected = yield* catalog.model.small(providerID)
expect(Option.getOrUndefined(selected)?.id).toBe(ModelV2.ID.make("gpt-5-nano"))
}).pipe(Effect.provide(Catalog.defaultLayer.pipe(Layer.provide(instanceLayer)))),
}).pipe(Effect.provide(Catalog.defaultLayer.pipe(Layer.provide(locationLayer)))),
)
})