diff --git a/packages/opencode/src/acp-next/event.ts b/packages/opencode/src/acp-next/event.ts new file mode 100644 index 0000000000..cffdccf14a --- /dev/null +++ b/packages/opencode/src/acp-next/event.ts @@ -0,0 +1,190 @@ +import type { AgentSideConnection } from "@agentclientprotocol/sdk" +import * as Log from "@opencode-ai/core/util/log" +import type { + Event, + EventMessagePartDelta, + EventMessagePartUpdated, + OpencodeClient, + Part, + SessionMessageResponse, +} from "@opencode-ai/sdk/v2" +import { Effect } from "effect" +import { ACPNextSession } from "./session" + +const log = Log.create({ service: "acp-next-event" }) + +type Connection = Pick +type GlobalEventEnvelope = { + payload?: Event +} +type GlobalEventStream = { + stream: AsyncIterable +} + +export function start(input: { + sdk: OpencodeClient + connection: Connection + session: ACPNextSession.Interface +}) { + const subscription = new Subscription(input) + subscription.start() + return subscription +} + +export class Subscription { + private readonly abort = new AbortController() + private started = false + + constructor( + private readonly input: { + sdk: OpencodeClient + connection: Connection + session: ACPNextSession.Interface + }, + ) {} + + start() { + if (this.started) return + this.started = true + this.run().catch((error: unknown) => { + if (this.abort.signal.aborted) return + log.error("event subscription failed", { error }) + }) + } + + stop() { + this.abort.abort() + } + + async handle(event: Event) { + switch (event.type) { + case "message.part.updated": + return this.handlePartUpdated(event) + case "message.part.delta": + return this.handlePartDelta(event) + } + } + + private async run() { + while (!this.abort.signal.aborted) { + const events = (await this.input.sdk.global.event({ + signal: this.abort.signal, + })) as GlobalEventStream + + for await (const event of events.stream) { + if (this.abort.signal.aborted) return + if (!event.payload) continue + await this.handle(event.payload).catch((error: unknown) => { + log.error("failed to handle event", { error, type: event.payload?.type }) + }) + } + if (!this.abort.signal.aborted) await new Promise((resolve) => setTimeout(resolve, 1000)) + } + } + + private async handlePartUpdated(event: EventMessagePartUpdated) { + const part = event.properties.part + const sessionId = part.sessionID || event.properties.sessionID + const session = await Effect.runPromise(this.input.session.tryGet(sessionId)) + if (!session) return + + await Effect.runPromise( + this.input.session.recordPartMetadata({ + sessionId: session.id, + messageId: part.messageID, + partId: part.id, + partType: part.type, + role: part.type === "reasoning" ? "assistant" : undefined, + ignored: part.type === "text" ? part.ignored : undefined, + toolCallId: part.type === "tool" ? part.callID : undefined, + metadata: "metadata" in part ? part.metadata : undefined, + }), + ) + } + + private async handlePartDelta(event: EventMessagePartDelta) { + const props = event.properties + const session = await Effect.runPromise(this.input.session.tryGet(props.sessionID)) + if (!session) return + + const known = await Effect.runPromise( + this.input.session.tryGetPartMetadata({ + sessionId: session.id, + messageId: props.messageID, + partId: props.partID, + }), + ) + const metadata = + known?.role && known.partType + ? known + : await this.fetchPartMetadata(session.id, session.cwd, props.messageID, props.partID) + if (metadata?.role !== "assistant") return + if (metadata.partType === "text" && props.field === "text" && metadata.ignored !== true) { + await this.input.connection.sessionUpdate({ + sessionId: session.id, + update: { + sessionUpdate: "agent_message_chunk", + messageId: props.messageID, + content: { + type: "text", + text: props.delta, + }, + }, + }) + return + } + + if (metadata.partType === "reasoning" && props.field === "text") { + await this.input.connection.sessionUpdate({ + sessionId: session.id, + update: { + sessionUpdate: "agent_thought_chunk", + messageId: props.messageID, + content: { + type: "text", + text: props.delta, + }, + }, + }) + } + } + + private async fetchPartMetadata(sessionId: string, cwd: string, messageId: string, partId: string) { + const message = await this.input.sdk.session + .message( + { + sessionID: sessionId, + messageID: messageId, + directory: cwd, + }, + { throwOnError: true }, + ) + .then((response) => response.data) + .catch((error: unknown) => { + log.error("unexpected error when fetching message for delta metadata", { error, messageId, partId }) + return undefined + }) + if (!message) return + + const part = message.parts.find((item) => item.id === partId) + if (!part) return + return await this.recordFetchedPart(sessionId, message, part) + } + + private async recordFetchedPart(sessionId: string, message: SessionMessageResponse, part: Part) { + return await Effect.runPromise( + this.input.session.recordPartMetadata({ + sessionId, + messageId: part.messageID, + partId: part.id, + partType: part.type, + role: message.info.role, + ignored: part.type === "text" ? part.ignored : undefined, + toolCallId: part.type === "tool" ? part.callID : undefined, + metadata: "metadata" in part ? part.metadata : undefined, + }), + ) + } +} + +export * as ACPNextEvent from "./event" diff --git a/packages/opencode/src/acp-next/service.ts b/packages/opencode/src/acp-next/service.ts index 5ec9fd48b8..a465264d45 100644 --- a/packages/opencode/src/acp-next/service.ts +++ b/packages/opencode/src/acp-next/service.ts @@ -31,11 +31,12 @@ import { } from "@agentclientprotocol/sdk" import { InstallationVersion } from "@opencode-ai/core/installation/version" import * as Log from "@opencode-ai/core/util/log" -import type { OpencodeClient } from "@opencode-ai/sdk/v2" +import type { Message, OpencodeClient } from "@opencode-ai/sdk/v2" import { Context, Effect, Layer, ManagedRuntime } from "effect" import * as ACPNextError from "./error" import { buildConfigOptions, parseModelSelection } from "./config-option" import { Directory } from "./directory" +import { ACPNextEvent } from "./event" import { ACPNextSession } from "./session" import { ModelID, ProviderID } from "@/provider/schema" import { Provider } from "@/provider/provider" @@ -71,10 +72,15 @@ export function make(input: { connection?: Pick directory?: Directory.Interface session?: ACPNextSession.Interface + eventSubscription?: (subscription: ACPNextEvent.Subscription) => void }): Interface { const session = input.session ?? makeSessionService() const directoryService = input.directory ?? makeDirectoryService(input.sdk) const registeredMcp = new Map>() + if (input.connection) { + const subscription = ACPNextEvent.start({ sdk: input.sdk, connection: input.connection, session }) + input.eventSubscription?.(subscription) + } const initialize = Effect.fn("ACPNext.initialize")(function* (params: InitializeRequest) { const authMethod: AuthMethod = { @@ -476,17 +482,13 @@ type SdkResponse = { } type MessageInfo = { - readonly role?: string - readonly model?: { - readonly providerID?: string - readonly modelID?: string - readonly variant?: string - } - readonly providerID?: string - readonly modelID?: string - readonly variant?: string - readonly mode?: string - readonly agent?: string + readonly role?: Message["role"] + readonly model?: Extract["model"] + readonly providerID?: Extract["providerID"] + readonly modelID?: Extract["modelID"] + readonly variant?: Extract["variant"] + readonly mode?: Extract["mode"] + readonly agent?: Message["agent"] } function request(fn: () => Promise>, service?: string) { diff --git a/packages/opencode/src/acp-next/session.ts b/packages/opencode/src/acp-next/session.ts index 7a969c867f..6ab61f5f0a 100644 --- a/packages/opencode/src/acp-next/session.ts +++ b/packages/opencode/src/acp-next/session.ts @@ -1,4 +1,5 @@ import type { McpServer } from "@agentclientprotocol/sdk" +import type { Message, Part } from "@opencode-ai/sdk/v2" import { Context, Effect, Layer, Ref } from "effect" import type { ModelID, ProviderID } from "../provider/schema" import * as ACPNextError from "./error" @@ -11,6 +12,9 @@ export type SelectedModel = { export type KnownMessagePartMetadata = { messageId: string partId: string + partType?: Part["type"] + role?: Message["role"] + ignored?: boolean toolCallId?: string metadata?: unknown } @@ -40,6 +44,9 @@ export type RecordPartMetadataInput = { sessionId: string messageId: string partId: string + partType?: Part["type"] + role?: Message["role"] + ignored?: boolean toolCallId?: string metadata?: unknown } @@ -146,6 +153,9 @@ export const layer = Layer.effect( const metadata = { messageId: input.messageId, partId: input.partId, + partType: input.partType, + role: input.role, + ignored: input.ignored, toolCallId: input.toolCallId, metadata: input.metadata, } diff --git a/packages/opencode/src/acp-next/usage.ts b/packages/opencode/src/acp-next/usage.ts index a37f370c68..eabbb02af2 100644 --- a/packages/opencode/src/acp-next/usage.ts +++ b/packages/opencode/src/acp-next/usage.ts @@ -1,5 +1,6 @@ import type { AgentSideConnection, Usage } from "@agentclientprotocol/sdk" import * as Log from "@opencode-ai/core/util/log" +import type { AssistantMessage as OpenCodeAssistantMessage, Message } from "@opencode-ai/sdk/v2" import { InstanceRef } from "@/effect/instance-ref" import { InstanceStore } from "@/project/instance-store" import { ModelID, ProviderID } from "@/provider/schema" @@ -8,27 +9,14 @@ import { Context, Effect, Layer, SynchronizedRef } from "effect" const log = Log.create({ service: "acp-next-usage" }) -export type AssistantTokenCost = { - readonly cost: number - readonly tokens: { - readonly input: number - readonly output: number - readonly reasoning: number - readonly cache: { - readonly read: number - readonly write: number - } - } -} +export type AssistantTokenCost = Pick -export type AssistantMessage = AssistantTokenCost & { - readonly role: "assistant" - readonly providerID?: string - readonly modelID?: string -} +export type AssistantMessage = AssistantTokenCost & + Pick & + Partial> export type SessionMessage = { - readonly info: { readonly role: string } | AssistantMessage + readonly info: { readonly role: Message["role"] } | AssistantMessage } export type MessagesInput = { diff --git a/packages/opencode/test/acp-next/event.test.ts b/packages/opencode/test/acp-next/event.test.ts new file mode 100644 index 0000000000..8a4c88f234 --- /dev/null +++ b/packages/opencode/test/acp-next/event.test.ts @@ -0,0 +1,332 @@ +import { describe, expect, it } from "bun:test" +import type { AgentSideConnection } from "@agentclientprotocol/sdk" +import type { Event, Message, OpencodeClient, Part, SessionMessageResponse } from "@opencode-ai/sdk/v2" +import { Effect, ManagedRuntime } from "effect" +import { ACPNextEvent } from "@/acp-next/event" +import * as ACPNextService from "@/acp-next/service" +import { Directory } from "@/acp-next/directory" +import { ACPNextSession } from "@/acp-next/session" + +type SessionUpdateParams = Parameters[0] +type GlobalEventEnvelope = { + payload?: Event +} +type DeltaPartType = Extract["type"] + +const pollUntil = async ( + check: () => boolean | Promise, + message: string, + opts?: { timeoutMs?: number; intervalMs?: number }, +) => { + const started = Date.now() + while (true) { + if (await check()) return + if (Date.now() - started > (opts?.timeoutMs ?? 2000)) throw new Error(message) + await new Promise((resolve) => setTimeout(resolve, opts?.intervalMs ?? 5)) + } +} + +function makeSessionService() { + return ManagedRuntime.make(ACPNextSession.defaultLayer).runSync( + ACPNextSession.Service.use((service) => Effect.succeed(service)), + ) +} + +function createEventStream() { + const queue: GlobalEventEnvelope[] = [] + const waiters: Array<(value: GlobalEventEnvelope | undefined) => void> = [] + const state = { closed: false } + + const push = (event: GlobalEventEnvelope) => { + const waiter = waiters.shift() + if (waiter) { + waiter(event) + return + } + queue.push(event) + } + + const close = () => { + state.closed = true + for (const waiter of waiters.splice(0)) { + waiter(undefined) + } + } + + const stream = async function* (signal?: AbortSignal) { + while (true) { + if (signal?.aborted) return + const next = queue.shift() + if (next) { + yield next + continue + } + if (state.closed) return + const value = await new Promise((resolve) => { + waiters.push(resolve) + signal?.addEventListener("abort", () => resolve(undefined), { once: true }) + }) + if (!value) return + yield value + } + } + + return { push, close, stream } +} + +function createHarness(messages: Record = {}) { + const updates: SessionUpdateParams[] = [] + const calls = { + eventSubscribe: 0, + message: 0, + } + const events = createEventStream() + const sdk = { + global: { + event: (options?: { signal?: AbortSignal }) => { + calls.eventSubscribe++ + return Promise.resolve({ stream: events.stream(options?.signal) }) + }, + }, + session: { + message: (input: { messageID: string }) => { + calls.message++ + return Promise.resolve({ data: messages[input.messageID] }) + }, + get: () => Promise.resolve({ data: { id: "ses_loaded" } }), + messages: () => Promise.resolve({ data: [] }), + }, + } as unknown as OpencodeClient + const connection = { + sessionUpdate: (params: SessionUpdateParams) => { + updates.push(params) + return Promise.resolve() + }, + } satisfies Pick + const session = makeSessionService() + const subscription = new ACPNextEvent.Subscription({ sdk, connection, session }) + + return { calls, connection, events, sdk, session, subscription, updates } +} + +function textDelta(sessionID: string, messageID: string, partID: string, delta: string): Event { + return { + id: `evt_${sessionID}_${messageID}_${partID}_${delta}`, + type: "message.part.delta", + properties: { + sessionID, + messageID, + partID, + field: "text", + delta, + }, + } +} + +function partUpdated(sessionID: string, messageID: string, partID: string, type: DeltaPartType): Event { + return { + id: `evt_${sessionID}_${messageID}_${partID}`, + type: "message.part.updated", + properties: { + sessionID, + time: Date.now(), + part: + type === "text" + ? { + id: partID, + sessionID, + messageID, + type: "text", + text: "", + } + : { + id: partID, + sessionID, + messageID, + type: "reasoning", + text: "", + time: { start: Date.now() }, + }, + }, + } +} + +function assistantMessage(sessionID: string, messageID: string, partID: string, type: DeltaPartType) { + return { + info: { + id: messageID, + sessionID, + role: "assistant", + time: { created: Date.now() }, + parentID: "msg_parent", + modelID: "model", + providerID: "provider", + mode: "build", + agent: "build", + path: { cwd: "/workspace", root: "/workspace" }, + cost: 0, + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + }, + parts: [ + type === "text" + ? { + id: partID, + sessionID, + messageID, + type: "text", + text: "", + } + : { + id: partID, + sessionID, + messageID, + type: "reasoning", + text: "", + time: { start: Date.now() }, + }, + ], + } satisfies SessionMessageResponse +} + +async function createKnownSession( + session: ACPNextSession.Interface, + sessionId: string, + part: { messageId: string; partId: string; partType: Part["type"]; role?: Message["role"] }, +) { + await Effect.runPromise(session.create({ id: sessionId, cwd: "/workspace" })) + await Effect.runPromise( + session.recordPartMetadata({ + sessionId, + messageId: part.messageId, + partId: part.partId, + partType: part.partType, + role: part.role ?? "assistant", + }), + ) +} + +describe("acp-next event routing", () => { + it("routes message.part.delta by sessionID without cross-session pollution", async () => { + const harness = createHarness() + await createKnownSession(harness.session, "ses_a", { messageId: "msg_a", partId: "part_a", partType: "text" }) + await createKnownSession(harness.session, "ses_b", { messageId: "msg_b", partId: "part_b", partType: "text" }) + + await harness.subscription.handle(textDelta("ses_b", "msg_b", "part_b", "hello")) + + expect(harness.updates.map((update) => update.sessionId)).toEqual(["ses_b"]) + expect(harness.updates[0]?.update.sessionUpdate).toBe("agent_message_chunk") + }) + + it("keeps interleaved sessions isolated for text and reasoning deltas", async () => { + const harness = createHarness() + await createKnownSession(harness.session, "ses_a", { messageId: "msg_a", partId: "part_a", partType: "text" }) + await createKnownSession(harness.session, "ses_b", { + messageId: "msg_b", + partId: "part_b", + partType: "reasoning", + }) + + await harness.subscription.handle(textDelta("ses_a", "msg_a", "part_a", "A1")) + await harness.subscription.handle(textDelta("ses_b", "msg_b", "part_b", "B1")) + await harness.subscription.handle(textDelta("ses_a", "msg_a", "part_a", "A2")) + await harness.subscription.handle(textDelta("ses_b", "msg_b", "part_b", "B2")) + + expect( + harness.updates + .filter((update) => update.sessionId === "ses_a") + .map((update) => update.update.sessionUpdate), + ).toEqual(["agent_message_chunk", "agent_message_chunk"]) + expect( + harness.updates + .filter((update) => update.sessionId === "ses_b") + .map((update) => update.update.sessionUpdate), + ).toEqual(["agent_thought_chunk", "agent_thought_chunk"]) + }) + + it("does not create extra subscriptions on repeated loadSession", async () => { + const harness = createHarness() + let subscription: ACPNextEvent.Subscription | undefined + const service = ACPNextService.make({ + sdk: harness.sdk, + connection: harness.connection, + directory: { + get: () => + Effect.succeed( + Directory.build({ + directory: "/workspace", + providers: {}, + modes: [], + defaultModeID: "build", + commands: [], + }), + ), + refresh: () => + Effect.succeed( + Directory.build({ + directory: "/workspace", + providers: {}, + modes: [], + defaultModeID: "build", + commands: [], + }), + ), + variants: Directory.variants, + }, + session: harness.session, + eventSubscription: (started) => { + subscription = started + }, + }) + + await pollUntil(() => harness.calls.eventSubscribe === 1, "event subscription did not start") + await Effect.runPromise(service.loadSession({ cwd: "/workspace", sessionId: "ses_loaded", mcpServers: [] })) + await Effect.runPromise(service.loadSession({ cwd: "/workspace", sessionId: "ses_loaded", mcpServers: [] })) + await Effect.runPromise(service.loadSession({ cwd: "/workspace", sessionId: "ses_loaded", mcpServers: [] })) + + expect(harness.calls.eventSubscribe).toBe(1) + subscription?.stop() + harness.events.close() + }) + + it("does not call sdk.session.message repeatedly when metadata is known", async () => { + const harness = createHarness() + await createKnownSession(harness.session, "ses_a", { messageId: "msg_a", partId: "part_a", partType: "text" }) + + for (const delta of ["a", "b", "c", "d", "e"]) { + await harness.subscription.handle(textDelta("ses_a", "msg_a", "part_a", delta)) + } + + expect(harness.calls.message).toBe(0) + expect(harness.updates).toHaveLength(5) + }) + + it("fetches unknown part metadata once and reuses it for later deltas", async () => { + const harness = createHarness({ + msg_a: assistantMessage("ses_a", "msg_a", "part_a", "text"), + }) + await Effect.runPromise(harness.session.create({ id: "ses_a", cwd: "/workspace" })) + + await harness.subscription.handle(partUpdated("ses_a", "msg_a", "part_a", "text")) + await harness.subscription.handle(textDelta("ses_a", "msg_a", "part_a", "a")) + await harness.subscription.handle(textDelta("ses_a", "msg_a", "part_a", "b")) + + expect(harness.calls.message).toBe(1) + expect(harness.updates).toHaveLength(2) + }) + + it("ignores unknown sessions and live user parts without user_message_chunk duplication", async () => { + const harness = createHarness() + await createKnownSession(harness.session, "ses_user", { + messageId: "msg_user", + partId: "part_user", + partType: "text", + role: "user", + }) + + await harness.subscription.handle(textDelta("ses_missing", "msg_missing", "part_missing", "ignored")) + await harness.subscription.handle(partUpdated("ses_user", "msg_user", "part_live", "text")) + await harness.subscription.handle(textDelta("ses_user", "msg_user", "part_user", "hello")) + + expect(harness.updates).toHaveLength(0) + }) +})