feat(acp-next): add event routing (#29327)

This commit is contained in:
Shoubhit Dash
2026-05-26 12:17:20 +05:30
committed by GitHub
parent 245f00a4b3
commit 7e5305c765
5 changed files with 552 additions and 30 deletions

View File

@@ -0,0 +1,190 @@
import type { AgentSideConnection } from "@agentclientprotocol/sdk"
import * as Log from "@opencode-ai/core/util/log"
import type {
Event,
EventMessagePartDelta,
EventMessagePartUpdated,
OpencodeClient,
Part,
SessionMessageResponse,
} from "@opencode-ai/sdk/v2"
import { Effect } from "effect"
import { ACPNextSession } from "./session"
const log = Log.create({ service: "acp-next-event" })
type Connection = Pick<AgentSideConnection, "sessionUpdate">
type GlobalEventEnvelope = {
payload?: Event
}
type GlobalEventStream = {
stream: AsyncIterable<GlobalEventEnvelope>
}
export function start(input: {
sdk: OpencodeClient
connection: Connection
session: ACPNextSession.Interface
}) {
const subscription = new Subscription(input)
subscription.start()
return subscription
}
export class Subscription {
private readonly abort = new AbortController()
private started = false
constructor(
private readonly input: {
sdk: OpencodeClient
connection: Connection
session: ACPNextSession.Interface
},
) {}
start() {
if (this.started) return
this.started = true
this.run().catch((error: unknown) => {
if (this.abort.signal.aborted) return
log.error("event subscription failed", { error })
})
}
stop() {
this.abort.abort()
}
async handle(event: Event) {
switch (event.type) {
case "message.part.updated":
return this.handlePartUpdated(event)
case "message.part.delta":
return this.handlePartDelta(event)
}
}
private async run() {
while (!this.abort.signal.aborted) {
const events = (await this.input.sdk.global.event({
signal: this.abort.signal,
})) as GlobalEventStream
for await (const event of events.stream) {
if (this.abort.signal.aborted) return
if (!event.payload) continue
await this.handle(event.payload).catch((error: unknown) => {
log.error("failed to handle event", { error, type: event.payload?.type })
})
}
if (!this.abort.signal.aborted) await new Promise((resolve) => setTimeout(resolve, 1000))
}
}
private async handlePartUpdated(event: EventMessagePartUpdated) {
const part = event.properties.part
const sessionId = part.sessionID || event.properties.sessionID
const session = await Effect.runPromise(this.input.session.tryGet(sessionId))
if (!session) return
await Effect.runPromise(
this.input.session.recordPartMetadata({
sessionId: session.id,
messageId: part.messageID,
partId: part.id,
partType: part.type,
role: part.type === "reasoning" ? "assistant" : undefined,
ignored: part.type === "text" ? part.ignored : undefined,
toolCallId: part.type === "tool" ? part.callID : undefined,
metadata: "metadata" in part ? part.metadata : undefined,
}),
)
}
private async handlePartDelta(event: EventMessagePartDelta) {
const props = event.properties
const session = await Effect.runPromise(this.input.session.tryGet(props.sessionID))
if (!session) return
const known = await Effect.runPromise(
this.input.session.tryGetPartMetadata({
sessionId: session.id,
messageId: props.messageID,
partId: props.partID,
}),
)
const metadata =
known?.role && known.partType
? known
: await this.fetchPartMetadata(session.id, session.cwd, props.messageID, props.partID)
if (metadata?.role !== "assistant") return
if (metadata.partType === "text" && props.field === "text" && metadata.ignored !== true) {
await this.input.connection.sessionUpdate({
sessionId: session.id,
update: {
sessionUpdate: "agent_message_chunk",
messageId: props.messageID,
content: {
type: "text",
text: props.delta,
},
},
})
return
}
if (metadata.partType === "reasoning" && props.field === "text") {
await this.input.connection.sessionUpdate({
sessionId: session.id,
update: {
sessionUpdate: "agent_thought_chunk",
messageId: props.messageID,
content: {
type: "text",
text: props.delta,
},
},
})
}
}
private async fetchPartMetadata(sessionId: string, cwd: string, messageId: string, partId: string) {
const message = await this.input.sdk.session
.message(
{
sessionID: sessionId,
messageID: messageId,
directory: cwd,
},
{ throwOnError: true },
)
.then((response) => response.data)
.catch((error: unknown) => {
log.error("unexpected error when fetching message for delta metadata", { error, messageId, partId })
return undefined
})
if (!message) return
const part = message.parts.find((item) => item.id === partId)
if (!part) return
return await this.recordFetchedPart(sessionId, message, part)
}
private async recordFetchedPart(sessionId: string, message: SessionMessageResponse, part: Part) {
return await Effect.runPromise(
this.input.session.recordPartMetadata({
sessionId,
messageId: part.messageID,
partId: part.id,
partType: part.type,
role: message.info.role,
ignored: part.type === "text" ? part.ignored : undefined,
toolCallId: part.type === "tool" ? part.callID : undefined,
metadata: "metadata" in part ? part.metadata : undefined,
}),
)
}
}
export * as ACPNextEvent from "./event"

View File

@@ -31,11 +31,12 @@ import {
} from "@agentclientprotocol/sdk"
import { InstallationVersion } from "@opencode-ai/core/installation/version"
import * as Log from "@opencode-ai/core/util/log"
import type { OpencodeClient } from "@opencode-ai/sdk/v2"
import type { Message, OpencodeClient } from "@opencode-ai/sdk/v2"
import { Context, Effect, Layer, ManagedRuntime } from "effect"
import * as ACPNextError from "./error"
import { buildConfigOptions, parseModelSelection } from "./config-option"
import { Directory } from "./directory"
import { ACPNextEvent } from "./event"
import { ACPNextSession } from "./session"
import { ModelID, ProviderID } from "@/provider/schema"
import { Provider } from "@/provider/provider"
@@ -71,10 +72,15 @@ export function make(input: {
connection?: Pick<AgentSideConnection, "sessionUpdate">
directory?: Directory.Interface
session?: ACPNextSession.Interface
eventSubscription?: (subscription: ACPNextEvent.Subscription) => void
}): Interface {
const session = input.session ?? makeSessionService()
const directoryService = input.directory ?? makeDirectoryService(input.sdk)
const registeredMcp = new Map<string, Set<string>>()
if (input.connection) {
const subscription = ACPNextEvent.start({ sdk: input.sdk, connection: input.connection, session })
input.eventSubscription?.(subscription)
}
const initialize = Effect.fn("ACPNext.initialize")(function* (params: InitializeRequest) {
const authMethod: AuthMethod = {
@@ -476,17 +482,13 @@ type SdkResponse<T> = {
}
type MessageInfo = {
readonly role?: string
readonly model?: {
readonly providerID?: string
readonly modelID?: string
readonly variant?: string
}
readonly providerID?: string
readonly modelID?: string
readonly variant?: string
readonly mode?: string
readonly agent?: string
readonly role?: Message["role"]
readonly model?: Extract<Message, { role: "user" }>["model"]
readonly providerID?: Extract<Message, { role: "assistant" }>["providerID"]
readonly modelID?: Extract<Message, { role: "assistant" }>["modelID"]
readonly variant?: Extract<Message, { role: "assistant" }>["variant"]
readonly mode?: Extract<Message, { role: "assistant" }>["mode"]
readonly agent?: Message["agent"]
}
function request<T>(fn: () => Promise<T | SdkResponse<T>>, service?: string) {

View File

@@ -1,4 +1,5 @@
import type { McpServer } from "@agentclientprotocol/sdk"
import type { Message, Part } from "@opencode-ai/sdk/v2"
import { Context, Effect, Layer, Ref } from "effect"
import type { ModelID, ProviderID } from "../provider/schema"
import * as ACPNextError from "./error"
@@ -11,6 +12,9 @@ export type SelectedModel = {
export type KnownMessagePartMetadata = {
messageId: string
partId: string
partType?: Part["type"]
role?: Message["role"]
ignored?: boolean
toolCallId?: string
metadata?: unknown
}
@@ -40,6 +44,9 @@ export type RecordPartMetadataInput = {
sessionId: string
messageId: string
partId: string
partType?: Part["type"]
role?: Message["role"]
ignored?: boolean
toolCallId?: string
metadata?: unknown
}
@@ -146,6 +153,9 @@ export const layer = Layer.effect(
const metadata = {
messageId: input.messageId,
partId: input.partId,
partType: input.partType,
role: input.role,
ignored: input.ignored,
toolCallId: input.toolCallId,
metadata: input.metadata,
}

View File

@@ -1,5 +1,6 @@
import type { AgentSideConnection, Usage } from "@agentclientprotocol/sdk"
import * as Log from "@opencode-ai/core/util/log"
import type { AssistantMessage as OpenCodeAssistantMessage, Message } from "@opencode-ai/sdk/v2"
import { InstanceRef } from "@/effect/instance-ref"
import { InstanceStore } from "@/project/instance-store"
import { ModelID, ProviderID } from "@/provider/schema"
@@ -8,27 +9,14 @@ import { Context, Effect, Layer, SynchronizedRef } from "effect"
const log = Log.create({ service: "acp-next-usage" })
export type AssistantTokenCost = {
readonly cost: number
readonly tokens: {
readonly input: number
readonly output: number
readonly reasoning: number
readonly cache: {
readonly read: number
readonly write: number
}
}
}
export type AssistantTokenCost = Pick<OpenCodeAssistantMessage, "cost" | "tokens">
export type AssistantMessage = AssistantTokenCost & {
readonly role: "assistant"
readonly providerID?: string
readonly modelID?: string
}
export type AssistantMessage = AssistantTokenCost &
Pick<OpenCodeAssistantMessage, "role"> &
Partial<Pick<OpenCodeAssistantMessage, "providerID" | "modelID">>
export type SessionMessage = {
readonly info: { readonly role: string } | AssistantMessage
readonly info: { readonly role: Message["role"] } | AssistantMessage
}
export type MessagesInput = {

View File

@@ -0,0 +1,332 @@
import { describe, expect, it } from "bun:test"
import type { AgentSideConnection } from "@agentclientprotocol/sdk"
import type { Event, Message, OpencodeClient, Part, SessionMessageResponse } from "@opencode-ai/sdk/v2"
import { Effect, ManagedRuntime } from "effect"
import { ACPNextEvent } from "@/acp-next/event"
import * as ACPNextService from "@/acp-next/service"
import { Directory } from "@/acp-next/directory"
import { ACPNextSession } from "@/acp-next/session"
type SessionUpdateParams = Parameters<AgentSideConnection["sessionUpdate"]>[0]
type GlobalEventEnvelope = {
payload?: Event
}
type DeltaPartType = Extract<Part, { type: "text" | "reasoning" }>["type"]
const pollUntil = async (
check: () => boolean | Promise<boolean>,
message: string,
opts?: { timeoutMs?: number; intervalMs?: number },
) => {
const started = Date.now()
while (true) {
if (await check()) return
if (Date.now() - started > (opts?.timeoutMs ?? 2000)) throw new Error(message)
await new Promise((resolve) => setTimeout(resolve, opts?.intervalMs ?? 5))
}
}
function makeSessionService() {
return ManagedRuntime.make(ACPNextSession.defaultLayer).runSync(
ACPNextSession.Service.use((service) => Effect.succeed(service)),
)
}
function createEventStream() {
const queue: GlobalEventEnvelope[] = []
const waiters: Array<(value: GlobalEventEnvelope | undefined) => void> = []
const state = { closed: false }
const push = (event: GlobalEventEnvelope) => {
const waiter = waiters.shift()
if (waiter) {
waiter(event)
return
}
queue.push(event)
}
const close = () => {
state.closed = true
for (const waiter of waiters.splice(0)) {
waiter(undefined)
}
}
const stream = async function* (signal?: AbortSignal) {
while (true) {
if (signal?.aborted) return
const next = queue.shift()
if (next) {
yield next
continue
}
if (state.closed) return
const value = await new Promise<GlobalEventEnvelope | undefined>((resolve) => {
waiters.push(resolve)
signal?.addEventListener("abort", () => resolve(undefined), { once: true })
})
if (!value) return
yield value
}
}
return { push, close, stream }
}
function createHarness(messages: Record<string, SessionMessageResponse> = {}) {
const updates: SessionUpdateParams[] = []
const calls = {
eventSubscribe: 0,
message: 0,
}
const events = createEventStream()
const sdk = {
global: {
event: (options?: { signal?: AbortSignal }) => {
calls.eventSubscribe++
return Promise.resolve({ stream: events.stream(options?.signal) })
},
},
session: {
message: (input: { messageID: string }) => {
calls.message++
return Promise.resolve({ data: messages[input.messageID] })
},
get: () => Promise.resolve({ data: { id: "ses_loaded" } }),
messages: () => Promise.resolve({ data: [] }),
},
} as unknown as OpencodeClient
const connection = {
sessionUpdate: (params: SessionUpdateParams) => {
updates.push(params)
return Promise.resolve()
},
} satisfies Pick<AgentSideConnection, "sessionUpdate">
const session = makeSessionService()
const subscription = new ACPNextEvent.Subscription({ sdk, connection, session })
return { calls, connection, events, sdk, session, subscription, updates }
}
function textDelta(sessionID: string, messageID: string, partID: string, delta: string): Event {
return {
id: `evt_${sessionID}_${messageID}_${partID}_${delta}`,
type: "message.part.delta",
properties: {
sessionID,
messageID,
partID,
field: "text",
delta,
},
}
}
function partUpdated(sessionID: string, messageID: string, partID: string, type: DeltaPartType): Event {
return {
id: `evt_${sessionID}_${messageID}_${partID}`,
type: "message.part.updated",
properties: {
sessionID,
time: Date.now(),
part:
type === "text"
? {
id: partID,
sessionID,
messageID,
type: "text",
text: "",
}
: {
id: partID,
sessionID,
messageID,
type: "reasoning",
text: "",
time: { start: Date.now() },
},
},
}
}
function assistantMessage(sessionID: string, messageID: string, partID: string, type: DeltaPartType) {
return {
info: {
id: messageID,
sessionID,
role: "assistant",
time: { created: Date.now() },
parentID: "msg_parent",
modelID: "model",
providerID: "provider",
mode: "build",
agent: "build",
path: { cwd: "/workspace", root: "/workspace" },
cost: 0,
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
},
parts: [
type === "text"
? {
id: partID,
sessionID,
messageID,
type: "text",
text: "",
}
: {
id: partID,
sessionID,
messageID,
type: "reasoning",
text: "",
time: { start: Date.now() },
},
],
} satisfies SessionMessageResponse
}
async function createKnownSession(
session: ACPNextSession.Interface,
sessionId: string,
part: { messageId: string; partId: string; partType: Part["type"]; role?: Message["role"] },
) {
await Effect.runPromise(session.create({ id: sessionId, cwd: "/workspace" }))
await Effect.runPromise(
session.recordPartMetadata({
sessionId,
messageId: part.messageId,
partId: part.partId,
partType: part.partType,
role: part.role ?? "assistant",
}),
)
}
describe("acp-next event routing", () => {
it("routes message.part.delta by sessionID without cross-session pollution", async () => {
const harness = createHarness()
await createKnownSession(harness.session, "ses_a", { messageId: "msg_a", partId: "part_a", partType: "text" })
await createKnownSession(harness.session, "ses_b", { messageId: "msg_b", partId: "part_b", partType: "text" })
await harness.subscription.handle(textDelta("ses_b", "msg_b", "part_b", "hello"))
expect(harness.updates.map((update) => update.sessionId)).toEqual(["ses_b"])
expect(harness.updates[0]?.update.sessionUpdate).toBe("agent_message_chunk")
})
it("keeps interleaved sessions isolated for text and reasoning deltas", async () => {
const harness = createHarness()
await createKnownSession(harness.session, "ses_a", { messageId: "msg_a", partId: "part_a", partType: "text" })
await createKnownSession(harness.session, "ses_b", {
messageId: "msg_b",
partId: "part_b",
partType: "reasoning",
})
await harness.subscription.handle(textDelta("ses_a", "msg_a", "part_a", "A1"))
await harness.subscription.handle(textDelta("ses_b", "msg_b", "part_b", "B1"))
await harness.subscription.handle(textDelta("ses_a", "msg_a", "part_a", "A2"))
await harness.subscription.handle(textDelta("ses_b", "msg_b", "part_b", "B2"))
expect(
harness.updates
.filter((update) => update.sessionId === "ses_a")
.map((update) => update.update.sessionUpdate),
).toEqual(["agent_message_chunk", "agent_message_chunk"])
expect(
harness.updates
.filter((update) => update.sessionId === "ses_b")
.map((update) => update.update.sessionUpdate),
).toEqual(["agent_thought_chunk", "agent_thought_chunk"])
})
it("does not create extra subscriptions on repeated loadSession", async () => {
const harness = createHarness()
let subscription: ACPNextEvent.Subscription | undefined
const service = ACPNextService.make({
sdk: harness.sdk,
connection: harness.connection,
directory: {
get: () =>
Effect.succeed(
Directory.build({
directory: "/workspace",
providers: {},
modes: [],
defaultModeID: "build",
commands: [],
}),
),
refresh: () =>
Effect.succeed(
Directory.build({
directory: "/workspace",
providers: {},
modes: [],
defaultModeID: "build",
commands: [],
}),
),
variants: Directory.variants,
},
session: harness.session,
eventSubscription: (started) => {
subscription = started
},
})
await pollUntil(() => harness.calls.eventSubscribe === 1, "event subscription did not start")
await Effect.runPromise(service.loadSession({ cwd: "/workspace", sessionId: "ses_loaded", mcpServers: [] }))
await Effect.runPromise(service.loadSession({ cwd: "/workspace", sessionId: "ses_loaded", mcpServers: [] }))
await Effect.runPromise(service.loadSession({ cwd: "/workspace", sessionId: "ses_loaded", mcpServers: [] }))
expect(harness.calls.eventSubscribe).toBe(1)
subscription?.stop()
harness.events.close()
})
it("does not call sdk.session.message repeatedly when metadata is known", async () => {
const harness = createHarness()
await createKnownSession(harness.session, "ses_a", { messageId: "msg_a", partId: "part_a", partType: "text" })
for (const delta of ["a", "b", "c", "d", "e"]) {
await harness.subscription.handle(textDelta("ses_a", "msg_a", "part_a", delta))
}
expect(harness.calls.message).toBe(0)
expect(harness.updates).toHaveLength(5)
})
it("fetches unknown part metadata once and reuses it for later deltas", async () => {
const harness = createHarness({
msg_a: assistantMessage("ses_a", "msg_a", "part_a", "text"),
})
await Effect.runPromise(harness.session.create({ id: "ses_a", cwd: "/workspace" }))
await harness.subscription.handle(partUpdated("ses_a", "msg_a", "part_a", "text"))
await harness.subscription.handle(textDelta("ses_a", "msg_a", "part_a", "a"))
await harness.subscription.handle(textDelta("ses_a", "msg_a", "part_a", "b"))
expect(harness.calls.message).toBe(1)
expect(harness.updates).toHaveLength(2)
})
it("ignores unknown sessions and live user parts without user_message_chunk duplication", async () => {
const harness = createHarness()
await createKnownSession(harness.session, "ses_user", {
messageId: "msg_user",
partId: "part_user",
partType: "text",
role: "user",
})
await harness.subscription.handle(textDelta("ses_missing", "msg_missing", "part_missing", "ignored"))
await harness.subscription.handle(partUpdated("ses_user", "msg_user", "part_live", "text"))
await harness.subscription.handle(textDelta("ses_user", "msg_user", "part_user", "hello"))
expect(harness.updates).toHaveLength(0)
})
})