feat(acp-next): add usage service (#29249)

This commit is contained in:
Shoubhit Dash
2026-05-25 22:17:11 +05:30
committed by GitHub
parent 7499021132
commit 249381e742
2 changed files with 565 additions and 0 deletions

View File

@@ -0,0 +1,251 @@
import type { AgentSideConnection, Usage } from "@agentclientprotocol/sdk"
import * as Log from "@opencode-ai/core/util/log"
import { InstanceRef } from "@/effect/instance-ref"
import { InstanceStore } from "@/project/instance-store"
import { ModelID, ProviderID } from "@/provider/schema"
import { Provider } from "@/provider/provider"
import { Context, Effect, Layer, SynchronizedRef } from "effect"
const log = Log.create({ service: "acp-next-usage" })
export type AssistantTokenCost = {
readonly cost: number
readonly tokens: {
readonly input: number
readonly output: number
readonly reasoning: number
readonly cache: {
readonly read: number
readonly write: number
}
}
}
export type AssistantMessage = AssistantTokenCost & {
readonly role: "assistant"
readonly providerID?: string
readonly modelID?: string
}
export type SessionMessage = {
readonly info: { readonly role: string } | AssistantMessage
}
export type MessagesInput = {
readonly sessionID: string
readonly directory: string
}
export type SDK = {
readonly session: {
readonly messages: (
parameters: { readonly sessionID: string; readonly directory: string },
options: { readonly throwOnError: true },
) => Promise<{ readonly data?: readonly SessionMessage[] | null }>
}
}
export interface MessageLoaderInterface {
readonly messages: (input: MessagesInput) => Effect.Effect<readonly SessionMessage[], unknown>
}
export interface ContextLimitLoaderInterface {
readonly providers: (directory: string) => Effect.Effect<Record<ProviderID, Provider.Info>, unknown>
}
export type UsageConnection = Pick<AgentSideConnection, "sessionUpdate">
export interface Interface {
readonly buildUsage: (message: AssistantTokenCost) => Usage
readonly latestAssistantMessage: (messages: readonly SessionMessage[]) => AssistantMessage | undefined
readonly totalSessionCost: (messages: readonly SessionMessage[]) => number
readonly contextLimit: (input: {
readonly directory: string
readonly providerID: ProviderID
readonly modelID: ModelID
}) => Effect.Effect<number | undefined>
readonly sendUpdate: (input: {
readonly connection: UsageConnection
readonly sessionID: string
readonly directory: string
}) => Effect.Effect<void>
}
export class MessageLoader extends Context.Service<MessageLoader, MessageLoaderInterface>()(
"@opencode/ACPNextUsageMessageLoader",
) {}
export class ContextLimitLoader extends Context.Service<ContextLimitLoader, ContextLimitLoaderInterface>()(
"@opencode/ACPNextUsageContextLimitLoader",
) {}
export class Service extends Context.Service<Service, Interface>()("@opencode/ACPNextUsage") {}
export function messageLoaderFromSDK(sdk: SDK): MessageLoaderInterface {
return MessageLoader.of({
messages: (input) =>
Effect.promise(() =>
sdk.session.messages({ sessionID: input.sessionID, directory: input.directory }, { throwOnError: true }).then(
(response) => response.data ?? [],
),
),
})
}
export const messageLoaderLayer = (sdk: SDK) => Layer.succeed(MessageLoader, messageLoaderFromSDK(sdk))
export function buildUsage(message: AssistantTokenCost): Usage {
const cachedReadTokens = message.tokens.cache.read
const cachedWriteTokens = message.tokens.cache.write
const thoughtTokens = message.tokens.reasoning
return {
inputTokens: message.tokens.input,
outputTokens: message.tokens.output,
totalTokens:
message.tokens.input + message.tokens.output + thoughtTokens + cachedReadTokens + cachedWriteTokens,
...(thoughtTokens > 0 ? { thoughtTokens } : {}),
...(cachedReadTokens > 0 ? { cachedReadTokens } : {}),
...(cachedWriteTokens > 0 ? { cachedWriteTokens } : {}),
}
}
export function latestAssistantMessage(messages: readonly SessionMessage[]): AssistantMessage | undefined {
return messages
.filter((message): message is { readonly info: AssistantMessage } => message.info.role === "assistant")
.at(-1)?.info
}
export function totalSessionCost(messages: readonly SessionMessage[]): number {
return messages
.filter((message): message is { readonly info: AssistantMessage } => message.info.role === "assistant")
.reduce((sum, message) => sum + message.info.cost, 0)
}
export function findContextLimit(
providers: Record<ProviderID, Provider.Info>,
providerID: ProviderID,
modelID: ModelID,
): number | undefined {
return providers[providerID]?.models[modelID]?.limit.context
}
export const contextLimitLoaderLayer = Layer.effect(
ContextLimitLoader,
Effect.gen(function* () {
const store = yield* InstanceStore.Service
const provider = yield* Provider.Service
return ContextLimitLoader.of({
providers: Effect.fn("ACPNextUsageContextLimitLoader.providers")(function* (directory) {
const ctx = yield* store.load({ directory })
return yield* Effect.gen(function* () {
return yield* provider.list()
}).pipe(Effect.provideService(InstanceRef, ctx))
}),
})
}),
)
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const messageLoader = yield* MessageLoader
const contextLimitLoader = yield* ContextLimitLoader
const limits = yield* SynchronizedRef.make(new Map<string, Effect.Effect<number | undefined>>())
const cachedLimit = Effect.fnUntraced(function* (input: {
readonly directory: string
readonly providerID: ProviderID
readonly modelID: ModelID
}) {
return yield* SynchronizedRef.modifyEffect(
limits,
Effect.fnUntraced(function* (items) {
const key = `${input.directory}\u0000${input.providerID}\u0000${input.modelID}`
const current = items.get(key)
if (current) return [current, items] as const
const next = yield* Effect.cached(
contextLimitLoader.providers(input.directory).pipe(
Effect.map((providers) => findContextLimit(providers, input.providerID, input.modelID)),
Effect.catch((error) =>
Effect.sync(() => {
log.error("failed to get providers for usage context limit", { error })
return undefined
}),
),
),
)
return [next, new Map(items).set(key, next)] as const
}),
)
})
const contextLimit = Effect.fn("ACPNextUsage.contextLimit")(function* (input: {
readonly directory: string
readonly providerID: ProviderID
readonly modelID: ModelID
}) {
return yield* (yield* cachedLimit(input))
})
const sendUpdate = Effect.fn("ACPNextUsage.sendUpdate")(function* (input: {
readonly connection: UsageConnection
readonly sessionID: string
readonly directory: string
}) {
const messages = yield* messageLoader.messages({ sessionID: input.sessionID, directory: input.directory }).pipe(
Effect.catch((error) =>
Effect.sync(() => {
log.error("failed to fetch messages for usage update", { error })
return undefined
}),
),
)
if (!messages) return
const message = latestAssistantMessage(messages)
if (!message) return
if (!message.providerID || !message.modelID) return
const size = yield* contextLimit({
directory: input.directory,
providerID: ProviderID.make(message.providerID),
modelID: ModelID.make(message.modelID),
})
if (!size) return
yield* Effect.promise(() =>
input.connection
.sessionUpdate({
sessionId: input.sessionID,
update: {
sessionUpdate: "usage_update",
used: message.tokens.input + message.tokens.cache.read,
size,
cost: { amount: totalSessionCost(messages), currency: "USD" },
},
})
.catch((error) => {
log.error("failed to send usage update", { error })
}),
)
})
return Service.of({
buildUsage,
latestAssistantMessage,
totalSessionCost,
contextLimit,
sendUpdate,
})
}),
)
export const defaultLayer = layer.pipe(
Layer.provide(contextLimitLoaderLayer),
Layer.provide(Provider.defaultLayer),
Layer.provide(InstanceStore.defaultLayer),
)
export * as UsageService from "./usage"

View File

@@ -0,0 +1,314 @@
import { describe, expect, test } from "bun:test"
import type { SessionNotification } from "@agentclientprotocol/sdk"
import { UsageService } from "@/acp-next/usage"
import { ModelID, ProviderID } from "@/provider/schema"
import { Provider } from "@/provider/provider"
import { Effect, Layer } from "effect"
import { it } from "../lib/effect"
const assistant = (
input: Partial<UsageService.AssistantMessage> & Pick<UsageService.AssistantMessage, "cost">,
): UsageService.SessionMessage => ({
info: {
role: "assistant",
providerID: "anthropic",
modelID: "claude-sonnet",
tokens: {
input: 10,
output: 20,
reasoning: 0,
cache: { read: 0, write: 0 },
},
...input,
},
})
const user = (): UsageService.SessionMessage => ({
info: { role: "user" },
})
const assistantWithoutProvider = (): UsageService.SessionMessage => ({
info: {
role: "assistant",
modelID: "claude-sonnet",
cost: 1,
tokens: {
input: 10,
output: 20,
reasoning: 0,
cache: { read: 0, write: 0 },
},
},
})
const model = (providerID: ProviderID, modelID: ModelID, context: number): Provider.Model => ({
id: modelID,
providerID,
api: {
id: modelID,
url: "https://example.com",
npm: "@ai-sdk/openai-compatible",
},
name: modelID,
family: "test",
capabilities: {
temperature: true,
reasoning: false,
attachment: false,
toolcall: true,
input: { text: true, audio: false, image: false, video: false, pdf: false },
output: { text: true, audio: false, image: false, video: false, pdf: false },
interleaved: false,
},
cost: {
input: 0,
output: 0,
cache: { read: 0, write: 0 },
},
limit: {
context,
output: 4096,
},
status: "active",
options: {},
headers: {},
release_date: "2026-01-01",
})
const providers = (context = 128_000): Record<ProviderID, Provider.Info> => {
const providerID = ProviderID.make("anthropic")
const modelID = ModelID.make("claude-sonnet")
return {
[providerID]: {
id: providerID,
name: "Anthropic",
source: "config",
env: [],
options: {},
models: {
[modelID]: model(providerID, modelID, context),
},
},
}
}
const fakeLayer = (input: {
readonly messages?: Effect.Effect<readonly UsageService.SessionMessage[], unknown>
readonly providers?: (directory: string) => Effect.Effect<Record<ProviderID, Provider.Info>, unknown>
}) =>
UsageService.layer.pipe(
Layer.provide(
Layer.mergeAll(
Layer.succeed(
UsageService.MessageLoader,
UsageService.MessageLoader.of({
messages: () => input.messages ?? Effect.succeed([]),
}),
),
Layer.succeed(
UsageService.ContextLimitLoader,
UsageService.ContextLimitLoader.of({
providers: input.providers ?? (() => Effect.succeed(providers())),
}),
),
),
),
)
const connection = (updates: SessionNotification[]) => ({
sessionUpdate(params: SessionNotification) {
updates.push(params)
return Promise.resolve()
},
})
describe("acp-next usage", () => {
test("builds ACP Usage from assistant token shape", () => {
expect(
UsageService.buildUsage({
cost: 0.02,
tokens: {
input: 100,
output: 40,
reasoning: 7,
cache: { read: 11, write: 13 },
},
}),
).toEqual({
inputTokens: 100,
outputTokens: 40,
thoughtTokens: 7,
cachedReadTokens: 11,
cachedWriteTokens: 13,
totalTokens: 171,
})
})
test("omits optional token fields when they are zero", () => {
expect(
UsageService.buildUsage({
cost: 0,
tokens: {
input: 3,
output: 4,
reasoning: 0,
cache: { read: 0, write: 0 },
},
}),
).toEqual({
inputTokens: 3,
outputTokens: 4,
totalTokens: 7,
})
})
test("finds the latest assistant message", () => {
expect(
UsageService.latestAssistantMessage([assistant({ cost: 1, modelID: "older" }), user(), assistant({ cost: 2 })]),
).toMatchObject({ cost: 2 })
})
test("calculates total session cost from assistant messages", () => {
expect(UsageService.totalSessionCost([assistant({ cost: 1.25 }), user(), assistant({ cost: 2.5 })])).toBe(3.75)
})
it.effect("loads context limits from providers and caches by directory/provider/model", () => {
const calls: string[] = []
return Effect.gen(function* () {
const usage = yield* UsageService.Service
const first = yield* usage.contextLimit({
directory: "/workspace",
providerID: ProviderID.make("anthropic"),
modelID: ModelID.make("claude-sonnet"),
})
const second = yield* usage.contextLimit({
directory: "/workspace",
providerID: ProviderID.make("anthropic"),
modelID: ModelID.make("claude-sonnet"),
})
expect(first).toBe(200_000)
expect(second).toBe(200_000)
expect(calls).toEqual(["/workspace"])
}).pipe(
Effect.provide(
fakeLayer({
providers: (directory) =>
Effect.sync(() => {
calls.push(directory)
return providers(200_000)
}),
}),
),
)
})
it.effect("sends ACP usage_update with context size and cumulative assistant cost", () => {
const updates: SessionNotification[] = []
return Effect.gen(function* () {
const usage = yield* UsageService.Service
yield* usage.sendUpdate({
connection: connection(updates),
sessionID: "ses_1",
directory: "/workspace",
})
expect(updates).toEqual([
{
sessionId: "ses_1",
update: {
sessionUpdate: "usage_update",
used: 15,
size: 128_000,
cost: { amount: 3, currency: "USD" },
},
},
])
}).pipe(
Effect.provide(
fakeLayer({
messages: Effect.succeed([
assistant({ cost: 1 }),
assistant({
cost: 2,
tokens: {
input: 10,
output: 20,
reasoning: 0,
cache: { read: 5, write: 0 },
},
}),
]),
}),
),
)
})
it.effect("skips usage update when messages cannot be fetched", () => {
const updates: SessionNotification[] = []
return Effect.gen(function* () {
const usage = yield* UsageService.Service
yield* usage.sendUpdate({
connection: connection(updates),
sessionID: "ses_1",
directory: "/workspace",
})
expect(updates).toEqual([])
}).pipe(Effect.provide(fakeLayer({ messages: Effect.fail(new Error("boom")) })))
})
it.effect("skips usage update when no assistant message exists", () => {
const updates: SessionNotification[] = []
return Effect.gen(function* () {
const usage = yield* UsageService.Service
yield* usage.sendUpdate({
connection: connection(updates),
sessionID: "ses_1",
directory: "/workspace",
})
expect(updates).toEqual([])
}).pipe(Effect.provide(fakeLayer({ messages: Effect.succeed([user()]) })))
})
it.effect("skips usage update when assistant message has no provider or model", () => {
const updates: SessionNotification[] = []
return Effect.gen(function* () {
const usage = yield* UsageService.Service
yield* usage.sendUpdate({
connection: connection(updates),
sessionID: "ses_1",
directory: "/workspace",
})
expect(updates).toEqual([])
}).pipe(
Effect.provide(
fakeLayer({
messages: Effect.succeed([assistantWithoutProvider()]),
}),
),
)
})
it.effect("skips usage update when context size is unknown", () => {
const updates: SessionNotification[] = []
return Effect.gen(function* () {
const usage = yield* UsageService.Service
yield* usage.sendUpdate({
connection: connection(updates),
sessionID: "ses_1",
directory: "/workspace",
})
expect(updates).toEqual([])
}).pipe(
Effect.provide(
fakeLayer({
messages: Effect.succeed([assistant({ cost: 1, providerID: "missing" })]),
}),
),
)
})
})