Compare commits

...

12 Commits

Author SHA1 Message Date
Felipe Coury
5b04edfda9 test(app-server): validate usage read accounting 2026-05-29 13:45:27 -03:00
Felipe Coury
14b8daa140 fix(usage): pass visible tools to compaction attribution 2026-05-29 13:45:27 -03:00
Felipe Coury
e4fe4716df fix(usage): record compaction usage samples 2026-05-29 13:45:26 -03:00
Felipe Coury
8c5f7dd9da feat(app-server): expose usage report 2026-05-29 13:45:26 -03:00
Felipe Coury
3f0045c005 test(usage): validate live attribution accounting 2026-05-29 13:45:08 -03:00
Felipe Coury
83505d85cf refactor(usage): borrow prompt attribution inputs 2026-05-29 12:38:19 -03:00
Felipe Coury
8f460a1632 fix(usage): attribute merged and code-mode tool specs 2026-05-28 21:43:45 -03:00
Felipe Coury
94b3e3e5eb fix(usage): attribute visible and deferred tools correctly 2026-05-28 21:05:18 -03:00
Felipe Coury
43d4c2ee73 feat(usage): attribute token usage 2026-05-28 21:05:18 -03:00
Felipe Coury
da401bd943 fix(state): avoid usage migration version collision 2026-05-28 21:04:51 -03:00
Felipe Coury
c59b2774e3 refactor(usage): simplify local storage implementation 2026-05-28 14:29:45 -03:00
Felipe Coury
8fc9fa833b feat(usage): add local usage storage 2026-05-28 14:29:44 -03:00
47 changed files with 3208 additions and 26 deletions

View File

@@ -4092,6 +4092,24 @@
],
"type": "object"
},
"UsageRange": {
"enum": [
"day",
"week"
],
"type": "string"
},
"UsageReadParams": {
"properties": {
"range": {
"$ref": "#/definitions/UsageRange"
}
},
"required": [
"range"
],
"type": "object"
},
"UserInput": {
"oneOf": [
{
@@ -4891,6 +4909,30 @@
"title": "Plugin/listRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"usage/read"
],
"title": "Usage/readRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/UsageReadParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Usage/readRequest",
"type": "object"
},
{
"properties": {
"id": {

View File

@@ -829,6 +829,30 @@
"title": "Plugin/listRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
"usage/read"
],
"title": "Usage/readRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/UsageReadParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Usage/readRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -18601,6 +18625,177 @@
],
"type": "object"
},
"UsageContributorKind": {
"enum": [
"skill",
"subagent",
"agentTask",
"app",
"mcpServer",
"plugin"
],
"type": "string"
},
"UsageEntry": {
"properties": {
"attributedTokens": {
"format": "int64",
"type": "integer"
},
"id": {
"type": "string"
},
"kind": {
"$ref": "#/definitions/v2/UsageContributorKind"
},
"label": {
"type": "string"
},
"percentOfUsage": {
"format": "uint8",
"minimum": 0.0,
"type": "integer"
}
},
"required": [
"attributedTokens",
"id",
"kind",
"label",
"percentOfUsage"
],
"type": "object"
},
"UsageHeadline": {
"properties": {
"entry": {
"$ref": "#/definitions/v2/UsageEntry"
},
"note": {
"type": [
"string",
"null"
]
}
},
"required": [
"entry"
],
"type": "object"
},
"UsageRange": {
"enum": [
"day",
"week"
],
"type": "string"
},
"UsageReadParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"range": {
"$ref": "#/definitions/v2/UsageRange"
}
},
"required": [
"range"
],
"title": "UsageReadParams",
"type": "object"
},
"UsageReadResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"report": {
"$ref": "#/definitions/v2/UsageReport"
}
},
"required": [
"report"
],
"title": "UsageReadResponse",
"type": "object"
},
"UsageReport": {
"properties": {
"agentTasks": {
"items": {
"$ref": "#/definitions/v2/UsageEntry"
},
"type": "array"
},
"apps": {
"items": {
"$ref": "#/definitions/v2/UsageEntry"
},
"type": "array"
},
"generatedAt": {
"format": "int64",
"type": "integer"
},
"headline": {
"anyOf": [
{
"$ref": "#/definitions/v2/UsageHeadline"
},
{
"type": "null"
}
]
},
"mcpServers": {
"items": {
"$ref": "#/definitions/v2/UsageEntry"
},
"type": "array"
},
"plugins": {
"items": {
"$ref": "#/definitions/v2/UsageEntry"
},
"type": "array"
},
"range": {
"$ref": "#/definitions/v2/UsageRange"
},
"skills": {
"items": {
"$ref": "#/definitions/v2/UsageEntry"
},
"type": "array"
},
"subagents": {
"items": {
"$ref": "#/definitions/v2/UsageEntry"
},
"type": "array"
},
"totalTokens": {
"format": "int64",
"type": "integer"
},
"trackedFrom": {
"format": "int64",
"type": [
"integer",
"null"
]
}
},
"required": [
"agentTasks",
"apps",
"generatedAt",
"mcpServers",
"plugins",
"range",
"skills",
"subagents",
"totalTokens"
],
"type": "object"
},
"UserInput": {
"oneOf": [
{

View File

@@ -1577,6 +1577,30 @@
"title": "Plugin/listRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"usage/read"
],
"title": "Usage/readRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/UsageReadParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Usage/readRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -16425,6 +16449,177 @@
],
"type": "object"
},
"UsageContributorKind": {
"enum": [
"skill",
"subagent",
"agentTask",
"app",
"mcpServer",
"plugin"
],
"type": "string"
},
"UsageEntry": {
"properties": {
"attributedTokens": {
"format": "int64",
"type": "integer"
},
"id": {
"type": "string"
},
"kind": {
"$ref": "#/definitions/UsageContributorKind"
},
"label": {
"type": "string"
},
"percentOfUsage": {
"format": "uint8",
"minimum": 0.0,
"type": "integer"
}
},
"required": [
"attributedTokens",
"id",
"kind",
"label",
"percentOfUsage"
],
"type": "object"
},
"UsageHeadline": {
"properties": {
"entry": {
"$ref": "#/definitions/UsageEntry"
},
"note": {
"type": [
"string",
"null"
]
}
},
"required": [
"entry"
],
"type": "object"
},
"UsageRange": {
"enum": [
"day",
"week"
],
"type": "string"
},
"UsageReadParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"range": {
"$ref": "#/definitions/UsageRange"
}
},
"required": [
"range"
],
"title": "UsageReadParams",
"type": "object"
},
"UsageReadResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"report": {
"$ref": "#/definitions/UsageReport"
}
},
"required": [
"report"
],
"title": "UsageReadResponse",
"type": "object"
},
"UsageReport": {
"properties": {
"agentTasks": {
"items": {
"$ref": "#/definitions/UsageEntry"
},
"type": "array"
},
"apps": {
"items": {
"$ref": "#/definitions/UsageEntry"
},
"type": "array"
},
"generatedAt": {
"format": "int64",
"type": "integer"
},
"headline": {
"anyOf": [
{
"$ref": "#/definitions/UsageHeadline"
},
{
"type": "null"
}
]
},
"mcpServers": {
"items": {
"$ref": "#/definitions/UsageEntry"
},
"type": "array"
},
"plugins": {
"items": {
"$ref": "#/definitions/UsageEntry"
},
"type": "array"
},
"range": {
"$ref": "#/definitions/UsageRange"
},
"skills": {
"items": {
"$ref": "#/definitions/UsageEntry"
},
"type": "array"
},
"subagents": {
"items": {
"$ref": "#/definitions/UsageEntry"
},
"type": "array"
},
"totalTokens": {
"format": "int64",
"type": "integer"
},
"trackedFrom": {
"format": "int64",
"type": [
"integer",
"null"
]
}
},
"required": [
"agentTasks",
"apps",
"generatedAt",
"mcpServers",
"plugins",
"range",
"skills",
"subagents",
"totalTokens"
],
"type": "object"
},
"UserInput": {
"oneOf": [
{

View File

@@ -0,0 +1,22 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"UsageRange": {
"enum": [
"day",
"week"
],
"type": "string"
}
},
"properties": {
"range": {
"$ref": "#/definitions/UsageRange"
}
},
"required": [
"range"
],
"title": "UsageReadParams",
"type": "object"
}

View File

@@ -0,0 +1,160 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"UsageContributorKind": {
"enum": [
"skill",
"subagent",
"agentTask",
"app",
"mcpServer",
"plugin"
],
"type": "string"
},
"UsageEntry": {
"properties": {
"attributedTokens": {
"format": "int64",
"type": "integer"
},
"id": {
"type": "string"
},
"kind": {
"$ref": "#/definitions/UsageContributorKind"
},
"label": {
"type": "string"
},
"percentOfUsage": {
"format": "uint8",
"minimum": 0.0,
"type": "integer"
}
},
"required": [
"attributedTokens",
"id",
"kind",
"label",
"percentOfUsage"
],
"type": "object"
},
"UsageHeadline": {
"properties": {
"entry": {
"$ref": "#/definitions/UsageEntry"
},
"note": {
"type": [
"string",
"null"
]
}
},
"required": [
"entry"
],
"type": "object"
},
"UsageRange": {
"enum": [
"day",
"week"
],
"type": "string"
},
"UsageReport": {
"properties": {
"agentTasks": {
"items": {
"$ref": "#/definitions/UsageEntry"
},
"type": "array"
},
"apps": {
"items": {
"$ref": "#/definitions/UsageEntry"
},
"type": "array"
},
"generatedAt": {
"format": "int64",
"type": "integer"
},
"headline": {
"anyOf": [
{
"$ref": "#/definitions/UsageHeadline"
},
{
"type": "null"
}
]
},
"mcpServers": {
"items": {
"$ref": "#/definitions/UsageEntry"
},
"type": "array"
},
"plugins": {
"items": {
"$ref": "#/definitions/UsageEntry"
},
"type": "array"
},
"range": {
"$ref": "#/definitions/UsageRange"
},
"skills": {
"items": {
"$ref": "#/definitions/UsageEntry"
},
"type": "array"
},
"subagents": {
"items": {
"$ref": "#/definitions/UsageEntry"
},
"type": "array"
},
"totalTokens": {
"format": "int64",
"type": "integer"
},
"trackedFrom": {
"format": "int64",
"type": [
"integer",
"null"
]
}
},
"required": [
"agentTasks",
"apps",
"generatedAt",
"mcpServers",
"plugins",
"range",
"skills",
"subagents",
"totalTokens"
],
"type": "object"
}
},
"properties": {
"report": {
"$ref": "#/definitions/UsageReport"
}
},
"required": [
"report"
],
"title": "UsageReadResponse",
"type": "object"
}

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type UsageContributorKind = "skill" | "subagent" | "agentTask" | "app" | "mcpServer" | "plugin";

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { UsageContributorKind } from "./UsageContributorKind";
export type UsageEntry = { kind: UsageContributorKind, id: string, label: string, attributedTokens: number, percentOfUsage: number, };

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { UsageEntry } from "./UsageEntry";
export type UsageHeadline = { entry: UsageEntry, note: string | null, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type UsageRange = "day" | "week";

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { UsageRange } from "./UsageRange";
export type UsageReadParams = { range: UsageRange, };

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { UsageReport } from "./UsageReport";
export type UsageReadResponse = { report: UsageReport, };

View File

@@ -0,0 +1,8 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { UsageEntry } from "./UsageEntry";
import type { UsageHeadline } from "./UsageHeadline";
import type { UsageRange } from "./UsageRange";
export type UsageReport = { range: UsageRange, generatedAt: number, trackedFrom: number | null, totalTokens: number, headline: UsageHeadline | null, skills: Array<UsageEntry>, subagents: Array<UsageEntry>, agentTasks: Array<UsageEntry>, apps: Array<UsageEntry>, mcpServers: Array<UsageEntry>, plugins: Array<UsageEntry>, };

View File

@@ -452,6 +452,13 @@ export type { TurnStatus } from "./TurnStatus";
export type { TurnSteerParams } from "./TurnSteerParams";
export type { TurnSteerResponse } from "./TurnSteerResponse";
export type { TurnsPage } from "./TurnsPage";
export type { UsageContributorKind } from "./UsageContributorKind";
export type { UsageEntry } from "./UsageEntry";
export type { UsageHeadline } from "./UsageHeadline";
export type { UsageRange } from "./UsageRange";
export type { UsageReadParams } from "./UsageReadParams";
export type { UsageReadResponse } from "./UsageReadResponse";
export type { UsageReport } from "./UsageReport";
export type { UserInput } from "./UserInput";
export type { WarningNotification } from "./WarningNotification";
export type { WebSearchAction } from "./WebSearchAction";

View File

@@ -635,6 +635,11 @@ client_request_definitions! {
serialization: None,
response: v2::PluginListResponse,
},
UsageRead => "usage/read" {
params: v2::UsageReadParams,
serialization: None,
response: v2::UsageReadResponse,
},
PluginInstalled => "plugin/installed" {
params: v2::PluginInstalledParams,
serialization: None,

View File

@@ -24,6 +24,7 @@ mod review;
mod thread;
mod thread_data;
mod turn;
mod usage;
mod windows_sandbox;
pub use account::*;
@@ -51,6 +52,7 @@ pub use shared::*;
pub use thread::*;
pub use thread_data::*;
pub use turn::*;
pub use usage::*;
pub use windows_sandbox::*;
#[cfg(test)]

View File

@@ -0,0 +1,92 @@
use codex_protocol::protocol::UsageContributorKind as CoreUsageContributorKind;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use ts_rs::TS;
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase", export_to = "v2/")]
pub enum UsageRange {
Day,
Week,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct UsageReadParams {
pub range: UsageRange,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct UsageEntry {
pub kind: UsageContributorKind,
pub id: String,
pub label: String,
#[ts(type = "number")]
pub attributed_tokens: i64,
pub percent_of_usage: u8,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct UsageHeadline {
pub entry: UsageEntry,
pub note: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct UsageReport {
pub range: UsageRange,
#[ts(type = "number")]
pub generated_at: i64,
#[ts(type = "number | null")]
pub tracked_from: Option<i64>,
#[ts(type = "number")]
pub total_tokens: i64,
pub headline: Option<UsageHeadline>,
pub skills: Vec<UsageEntry>,
pub subagents: Vec<UsageEntry>,
pub agent_tasks: Vec<UsageEntry>,
pub apps: Vec<UsageEntry>,
pub mcp_servers: Vec<UsageEntry>,
pub plugins: Vec<UsageEntry>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct UsageReadResponse {
pub report: UsageReport,
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase", export_to = "v2/")]
pub enum UsageContributorKind {
Skill,
Subagent,
AgentTask,
App,
McpServer,
Plugin,
}
impl From<CoreUsageContributorKind> for UsageContributorKind {
fn from(value: CoreUsageContributorKind) -> Self {
match value {
CoreUsageContributorKind::Skill => Self::Skill,
CoreUsageContributorKind::Subagent => Self::Subagent,
CoreUsageContributorKind::AgentTask => Self::AgentTask,
CoreUsageContributorKind::App => Self::App,
CoreUsageContributorKind::McpServer => Self::McpServer,
CoreUsageContributorKind::Plugin => Self::Plugin,
}
}
}

View File

@@ -206,6 +206,7 @@ Example with notification opt-out:
- `plugin/skill/read` — read remote plugin skill markdown on demand by `remoteMarketplaceName`, `remotePluginId`, and `skillName`. This lets clients preview uninstalled remote plugin skills without downloading the plugin bundle.
- `skills/changed` — notification emitted when watched local skill files change.
- `app/list` — list available apps.
- `usage/read` — read forward-only local token usage for a rolling `day` or `week` range, grouped by skills, subagents, agent tasks, apps, MCP servers, and plugins when Codex has tracked those contributors in sqlite.
- `remoteControl/enable` — experimental; enable remote control for the current app-server process and return the current remote-control status snapshot. The caller is responsible for persisting the desired setting outside app-server.
- `remoteControl/disable` — experimental; disable remote control for the current app-server process and return the current remote-control status snapshot. This does not revoke already enrolled controller devices.
- `remoteControl/status/read` — experimental; read the current remote-control status snapshot. `status` is one of `disabled`, `connecting`, `connected`, or `errored`; `serverName` is the local machine name used by this app-server process; `environmentId` is a string when the app-server has a current enrollment and `null` when that enrollment is cleared, invalidated, or remote control is disabled.

View File

@@ -36,6 +36,7 @@ use crate::request_processors::SearchRequestProcessor;
use crate::request_processors::ThreadGoalRequestProcessor;
use crate::request_processors::ThreadRequestProcessor;
use crate::request_processors::TurnRequestProcessor;
use crate::request_processors::UsageRequestProcessor;
use crate::request_processors::WindowsSandboxRequestProcessor;
use crate::request_serialization::QueuedInitializedRequest;
use crate::request_serialization::RequestSerializationQueueKey;
@@ -182,6 +183,7 @@ pub(crate) struct MessageProcessor {
thread_goal_processor: ThreadGoalRequestProcessor,
thread_processor: ThreadRequestProcessor,
turn_processor: TurnRequestProcessor,
usage_processor: UsageRequestProcessor,
windows_sandbox_processor: WindowsSandboxRequestProcessor,
request_serialization_queues: RequestSerializationQueues,
}
@@ -428,9 +430,10 @@ impl MessageProcessor {
thread_watch_manager.clone(),
Arc::clone(&thread_list_state_permit),
thread_goal_processor.clone(),
state_db,
state_db.clone(),
Arc::clone(&skills_watcher),
);
let usage_processor = UsageRequestProcessor::new(state_db);
let turn_processor = TurnRequestProcessor::new(
auth_manager.clone(),
Arc::clone(&thread_manager),
@@ -507,6 +510,7 @@ impl MessageProcessor {
thread_goal_processor,
thread_processor,
turn_processor,
usage_processor,
windows_sandbox_processor,
request_serialization_queues: RequestSerializationQueues::default(),
}
@@ -1123,6 +1127,11 @@ impl MessageProcessor {
ClientRequest::PluginList { params, .. } => {
self.plugin_processor.plugin_list(params).await
}
ClientRequest::UsageRead { params, .. } => self
.usage_processor
.usage_read(params)
.await
.map(|response| Some(response.into())),
ClientRequest::PluginInstalled { params, .. } => {
self.plugin_processor.plugin_installed(params).await
}

View File

@@ -474,6 +474,7 @@ mod search;
mod thread_processor;
mod token_usage_replay;
mod turn_processor;
mod usage_processor;
mod windows_sandbox_processor;
pub(crate) use account_processor::AccountRequestProcessor;
@@ -496,6 +497,7 @@ pub(crate) use search::SearchRequestProcessor;
pub(crate) use thread_goal_processor::ThreadGoalRequestProcessor;
pub(crate) use thread_processor::ThreadRequestProcessor;
pub(crate) use turn_processor::TurnRequestProcessor;
pub(crate) use usage_processor::UsageRequestProcessor;
pub(crate) use windows_sandbox_processor::WindowsSandboxRequestProcessor;
use crate::error_code::internal_error;

View File

@@ -0,0 +1,77 @@
use super::*;
use crate::error_code::internal_error;
use chrono::Utc;
use codex_app_server_protocol::UsageEntry;
use codex_app_server_protocol::UsageHeadline;
use codex_app_server_protocol::UsageRange;
use codex_app_server_protocol::UsageReadParams;
use codex_app_server_protocol::UsageReadResponse;
use codex_app_server_protocol::UsageReport;
use codex_rollout::StateDbHandle;
#[derive(Clone)]
pub(crate) struct UsageRequestProcessor {
state_db: Option<StateDbHandle>,
}
impl UsageRequestProcessor {
pub(crate) fn new(state_db: Option<StateDbHandle>) -> Self {
Self { state_db }
}
pub(crate) async fn usage_read(
&self,
params: UsageReadParams,
) -> Result<UsageReadResponse, JSONRPCErrorError> {
let state_db = self
.state_db
.as_ref()
.ok_or_else(|| internal_error("sqlite state db unavailable for usage"))?;
let report = state_db
.read_usage_report(state_usage_range(params.range), Utc::now().timestamp())
.await
.map_err(|err| internal_error(format!("failed to read usage report: {err}")))?;
Ok(UsageReadResponse {
report: UsageReport {
range: api_usage_range(report.range),
generated_at: report.generated_at,
tracked_from: report.tracked_from,
total_tokens: report.total_tokens,
headline: report.headline.map(|headline| UsageHeadline {
entry: usage_entry(headline.entry),
note: headline.note,
}),
skills: report.skills.into_iter().map(usage_entry).collect(),
subagents: report.subagents.into_iter().map(usage_entry).collect(),
agent_tasks: report.agent_tasks.into_iter().map(usage_entry).collect(),
apps: report.apps.into_iter().map(usage_entry).collect(),
mcp_servers: report.mcp_servers.into_iter().map(usage_entry).collect(),
plugins: report.plugins.into_iter().map(usage_entry).collect(),
},
})
}
}
fn usage_entry(entry: codex_state::UsageEntry) -> UsageEntry {
UsageEntry {
kind: entry.kind.into(),
id: entry.id,
label: entry.label,
attributed_tokens: entry.attributed_tokens,
percent_of_usage: entry.percent_of_usage,
}
}
fn state_usage_range(value: UsageRange) -> codex_state::UsageRange {
match value {
UsageRange::Day => codex_state::UsageRange::Day,
UsageRange::Week => codex_state::UsageRange::Week,
}
}
fn api_usage_range(value: codex_state::UsageRange) -> UsageRange {
match value {
codex_state::UsageRange::Day => UsageRange::Day,
codex_state::UsageRange::Week => UsageRange::Week,
}
}

View File

@@ -101,6 +101,7 @@ use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnSteerParams;
use codex_app_server_protocol::UsageReadParams;
use codex_app_server_protocol::WindowsSandboxSetupStartParams;
use codex_login::default_client::CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR;
use tokio::process::Command;
@@ -818,6 +819,15 @@ impl McpProcess {
self.send_request("turn/start", params).await
}
/// Send a `usage/read` JSON-RPC request (v2).
pub async fn send_usage_read_request(
&mut self,
params: UsageReadParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("usage/read", params).await
}
/// Send a `thread/inject_items` JSON-RPC request (v2).
pub async fn send_thread_inject_items_request(
&mut self,

View File

@@ -68,5 +68,6 @@ mod turn_interrupt;
mod turn_start;
mod turn_start_zsh_fork;
mod turn_steer;
mod usage;
mod web_search;
mod windows_sandbox_setup;

View File

@@ -0,0 +1,153 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UsageRange;
use codex_app_server_protocol::UsageReadParams;
use codex_app_server_protocol::UsageReadResponse;
use codex_app_server_protocol::UserInput;
use core_test_support::responses;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn usage_read_returns_metrics_recorded_by_a_real_turn() -> Result<()> {
let server = responses::start_mock_server().await;
responses::mount_sse_once(
&server,
responses::sse(vec![
responses::ev_response_created("resp-usage"),
responses::ev_assistant_message("msg-usage", "Done"),
responses::ev_completed_with_tokens("resp-usage", /*total_tokens*/ 500),
]),
)
.await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let skill_path = codex_home
.path()
.join(".agents/skills/usage-rpc-e2e/SKILL.md");
std::fs::create_dir_all(skill_path.parent().expect("skill path should have parent"))?;
std::fs::write(
&skill_path,
"---\nname: usage-rpc-e2e\ndescription: RPC usage test\n---\n\nRecord this skill.\n",
)?;
let skill_path = skill_path.canonicalize()?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let empty_request = mcp
.send_usage_read_request(UsageReadParams {
range: UsageRange::Day,
})
.await?;
let empty_response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(empty_request)),
)
.await??;
let empty_report = to_response::<UsageReadResponse>(empty_response)?.report;
assert_eq!(empty_report.total_tokens, 0);
assert!(empty_report.skills.is_empty());
let thread_request = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_request)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_response)?;
let turn_request = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
input: vec![
UserInput::Text {
text: "Use $usage-rpc-e2e".to_string(),
text_elements: Vec::new(),
},
UserInput::Skill {
name: "usage-rpc-e2e".to_string(),
path: skill_path.clone(),
},
],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_request)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let daily = read_usage(&mut mcp, UsageRange::Day).await?;
assert_eq!(daily.report.range, UsageRange::Day);
assert_eq!(daily.report.total_tokens, 500);
assert!(daily.report.tracked_from.is_some());
assert_eq!(daily.report.skills.len(), 1);
assert_eq!(daily.report.skills[0].label, "usage-rpc-e2e");
assert!(daily.report.skills[0].attributed_tokens > 0);
let weekly = read_usage(&mut mcp, UsageRange::Week).await?;
assert_eq!(weekly.report.range, UsageRange::Week);
assert_eq!(weekly.report.total_tokens, 500);
assert_eq!(weekly.report.skills, daily.report.skills);
Ok(())
}
async fn read_usage(mcp: &mut McpProcess, range: UsageRange) -> Result<UsageReadResponse> {
let request = mcp
.send_usage_read_request(UsageReadParams { range })
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request)),
)
.await??;
to_response::<UsageReadResponse>(response)
}
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
std::fs::write(
codex_home.join("config.toml"),
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[features]
sqlite = true
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -1,3 +1,4 @@
use crate::usage::UsagePromptAttribution;
pub use codex_api::ResponseEvent;
use codex_config::types::Personality;
use codex_protocol::error::Result;
@@ -33,6 +34,8 @@ pub struct Prompt {
/// Whether parallel tool calls are permitted for this prompt.
pub(crate) parallel_tool_calls: bool,
pub(crate) usage_attribution: UsagePromptAttribution,
pub base_instructions: BaseInstructions,
/// Optionally specify the personality of the model.
@@ -51,6 +54,7 @@ impl Default for Prompt {
input: Vec::new(),
tools: Vec::new(),
parallel_tool_calls: false,
usage_attribution: UsagePromptAttribution::default(),
base_instructions: BaseInstructions::default(),
personality: None,
output_schema: None,

View File

@@ -194,6 +194,7 @@ async fn run_remote_compact_task_inner_impl(
input: prompt_input,
tools: tool_router.model_visible_specs(),
parallel_tool_calls: turn_context.model_info.supports_parallel_tool_calls,
usage_attribution: Default::default(),
base_instructions,
personality: turn_context.personality,
output_schema: None,

View File

@@ -35,6 +35,7 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TruncationPolicy;
use codex_protocol::protocol::TurnStartedEvent;
use codex_rollout_trace::CompactionCheckpointTracePayload;
@@ -207,10 +208,18 @@ async fn run_remote_compact_task_inner_impl(
.await?;
let mut input = prompt_input.clone();
input.push(ResponseItem::CompactionTrigger);
let tools = tool_router.model_visible_specs();
let usage_attribution = crate::usage::UsagePromptAttribution::from_prompt(
&input,
&tools,
&tool_router,
base_instructions.text.as_str(),
);
let prompt = Prompt {
input,
tools: tool_router.model_visible_specs(),
tools,
parallel_tool_calls: turn_context.model_info.supports_parallel_tool_calls,
usage_attribution,
base_instructions,
personality: turn_context.personality,
output_schema: None,
@@ -248,9 +257,16 @@ async fn run_remote_compact_task_inner_impl(
trace_attempt.record_result(
compaction_output_result
.as_ref()
.map(|(item, _)| std::slice::from_ref(item)),
.map(|(item, _, _)| std::slice::from_ref(item)),
);
let (compaction_output, response_id) = compaction_output_result?;
let (compaction_output, response_id, token_usage) = compaction_output_result?;
sess.record_usage_attribution(
turn_context,
&prompt,
response_id.as_str(),
token_usage.as_ref(),
)
.await;
let compacted_history = build_v2_compacted_history(&prompt_input, compaction_output);
let new_history = process_compacted_history(
sess.as_ref(),
@@ -293,7 +309,7 @@ async fn run_remote_compaction_request_v2(
client_session: &mut ModelClientSession,
prompt: &Prompt,
turn_metadata_header: Option<&str>,
) -> CodexResult<(ResponseItem, String)> {
) -> CodexResult<(ResponseItem, String, Option<TokenUsage>)> {
let max_retries = turn_context
.provider
.info()
@@ -363,11 +379,12 @@ async fn log_remote_compaction_request_failure(
async fn collect_compaction_output(
mut stream: ResponseStream,
) -> CodexResult<(ResponseItem, String)> {
) -> CodexResult<(ResponseItem, String, Option<TokenUsage>)> {
let mut output_item_count = 0usize;
let mut compaction_count = 0usize;
let mut compaction_output = None;
let mut completed_response_id = None;
let mut completed_token_usage = None;
while let Some(event) = stream.next().await {
match event? {
ResponseEvent::OutputItemDone(item) => {
@@ -379,8 +396,13 @@ async fn collect_compaction_output(
}
}
}
ResponseEvent::Completed { response_id, .. } => {
ResponseEvent::Completed {
response_id,
token_usage,
..
} => {
completed_response_id = Some(response_id);
completed_token_usage = token_usage;
break;
}
_ => {}
@@ -403,7 +425,7 @@ async fn collect_compaction_output(
let Some(compaction_output) = compaction_output else {
unreachable!("compaction output must exist when count is exactly one");
};
Ok((compaction_output, response_id))
Ok((compaction_output, response_id, completed_token_usage))
}
fn build_v2_compacted_history(
@@ -727,6 +749,13 @@ mod tests {
let compaction = ResponseItem::Compaction {
encrypted_content: "encrypted".to_string(),
};
let expected_token_usage = TokenUsage {
input_tokens: 100,
cached_input_tokens: 25,
output_tokens: 10,
reasoning_output_tokens: 0,
total_tokens: 110,
};
let stream = response_stream(vec![
Ok(ResponseEvent::OutputItemDone(message(
"assistant",
@@ -736,16 +765,17 @@ mod tests {
Ok(ResponseEvent::OutputItemDone(compaction.clone())),
Ok(ResponseEvent::Completed {
response_id: "resp-compact".to_string(),
token_usage: None,
token_usage: Some(expected_token_usage.clone()),
end_turn: Some(true),
}),
]);
let (output, response_id) = collect_compaction_output(stream)
let (output, response_id, token_usage) = collect_compaction_output(stream)
.await
.expect("compaction should be collected");
assert_eq!(output, compaction);
assert_eq!(response_id, "resp-compact");
assert_eq!(token_usage, Some(expected_token_usage));
}
}

View File

@@ -99,6 +99,7 @@ pub(crate) use skills::skills_load_input_from_config;
mod stream_events_utils;
pub mod test_support;
mod unified_exec;
mod usage;
pub mod windows_sandbox;
pub use client::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
pub use codex_protocol::config_types::ModelProviderAuthInfo;

View File

@@ -2947,6 +2947,36 @@ impl Session {
}
}
pub(crate) async fn record_usage_attribution(
&self,
turn_context: &TurnContext,
prompt: &crate::client_common::Prompt,
response_id: &str,
token_usage: Option<&TokenUsage>,
) {
let Some(token_usage) = token_usage else {
return;
};
let occurred_at = chrono::Utc::now().timestamp();
let attribution = prompt.usage_attribution.complete(
format!("{}:{response_id}", self.conversation_id),
turn_context.sub_id.clone(),
response_id.to_string(),
occurred_at,
token_usage.clone(),
);
if let Some(state_db) = self.state_db()
&& let Err(err) = state_db
.record_usage_sample(&codex_state::UsageSample {
thread_id: self.conversation_id,
attribution,
})
.await
{
warn!("failed to persist usage sample: {err}");
}
}
pub(crate) async fn recompute_token_usage(&self, turn_context: &TurnContext) {
let history = self.clone_history().await;
let base_instructions = self.get_base_instructions().await;

View File

@@ -900,10 +900,18 @@ pub(crate) fn build_prompt(
turn_context: &TurnContext,
base_instructions: BaseInstructions,
) -> Prompt {
let tools = router.model_visible_specs();
let usage_attribution = crate::usage::UsagePromptAttribution::from_prompt(
&input,
&tools,
router,
base_instructions.text.as_str(),
);
Prompt {
input,
tools: router.model_visible_specs(),
tools,
parallel_tool_calls: turn_context.model_info.supports_parallel_tool_calls,
usage_attribution,
base_instructions,
personality: turn_context.personality,
output_schema: turn_context.final_output_json_schema.clone(),
@@ -2006,6 +2014,13 @@ async fn try_run_sampling_request(
&mut assistant_message_stream_parsers,
)
.await;
sess.record_usage_attribution(
&turn_context,
prompt,
response_id.as_str(),
token_usage.as_ref(),
)
.await;
sess.record_token_usage_info(&turn_context, token_usage.as_ref())
.await;
should_emit_token_count = true;

View File

@@ -5,6 +5,7 @@ use crate::tools::context::ToolPayload;
use crate::tools::context::boxed_tool_output;
use crate::tools::registry::CoreToolRuntime;
use crate::tools::registry::ToolExecutor;
use codex_protocol::protocol::UsageContributor;
use codex_tools::ToolName;
use codex_tools::ToolSpec;
@@ -16,13 +17,19 @@ use super::is_exec_tool_name;
pub struct CodeModeExecuteHandler {
spec: ToolSpec,
nested_tool_specs: Vec<ToolSpec>,
usage_contributors: Vec<UsageContributor>,
}
impl CodeModeExecuteHandler {
pub(crate) fn new(spec: ToolSpec, nested_tool_specs: Vec<ToolSpec>) -> Self {
pub(crate) fn new(
spec: ToolSpec,
nested_tool_specs: Vec<ToolSpec>,
usage_contributors: Vec<UsageContributor>,
) -> Self {
Self {
spec,
nested_tool_specs,
usage_contributors,
}
}
@@ -117,6 +124,10 @@ impl ToolExecutor<ToolInvocation> for CodeModeExecuteHandler {
}
impl CoreToolRuntime for CodeModeExecuteHandler {
fn usage_contributors(&self) -> Vec<UsageContributor> {
self.usage_contributors.clone()
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Custom { .. })
}

View File

@@ -17,6 +17,8 @@ use crate::tools::registry::ToolExecutor;
use crate::tools::registry::ToolTelemetryTags;
use crate::tools::tool_search_entry::ToolSearchInfo;
use codex_mcp::ToolInfo;
use codex_protocol::protocol::UsageContributor;
use codex_protocol::protocol::UsageContributorKind;
use codex_tools::ResponsesApiNamespace;
use codex_tools::ResponsesApiNamespaceTool;
use codex_tools::ToolName;
@@ -131,6 +133,38 @@ impl ToolExecutor<ToolInvocation> for McpHandler {
}
impl CoreToolRuntime for McpHandler {
fn usage_contributors(&self) -> Vec<UsageContributor> {
let mut contributors = Vec::new();
if let Some(connector_id) = self.tool_info.connector_id.as_ref() {
contributors.push(UsageContributor {
kind: UsageContributorKind::App,
id: connector_id.clone(),
label: self
.tool_info
.connector_name
.clone()
.unwrap_or_else(|| connector_id.clone()),
});
} else {
contributors.push(UsageContributor {
kind: UsageContributorKind::McpServer,
id: self.tool_info.server_name.clone(),
label: self.tool_info.server_name.clone(),
});
}
contributors.extend(
self.tool_info
.plugin_display_names
.iter()
.map(|plugin_name| UsageContributor {
kind: UsageContributorKind::Plugin,
id: plugin_name.clone(),
label: plugin_name.clone(),
}),
);
contributors
}
fn search_info(&self) -> Option<ToolSearchInfo> {
let source_name = self
.tool_info

View File

@@ -423,6 +423,8 @@ mod tests {
let router = Arc::new(ToolRouter::from_parts(
ToolRegistry::from_tools([handler]),
Vec::new(),
Vec::new(),
std::collections::HashMap::new(),
));
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let runtime = ToolCallRuntime::new(router, session, turn_context, tracker);
@@ -495,6 +497,8 @@ mod tests {
let router = Arc::new(ToolRouter::from_parts(
ToolRegistry::from_tools([handler]),
Vec::new(),
Vec::new(),
std::collections::HashMap::new(),
));
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let runtime = ToolCallRuntime::new(router, session, turn_context, tracker);

View File

@@ -30,6 +30,7 @@ use codex_extension_api::ToolCallOutcome;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::UsageContributor;
use codex_tools::ToolName;
use codex_tools::ToolSpec;
use futures::future::BoxFuture;
@@ -104,6 +105,10 @@ pub(crate) trait CoreToolRuntime: ToolExecutor<ToolInvocation> {
})
}
fn usage_contributors(&self) -> Vec<UsageContributor> {
Vec::new()
}
fn pre_tool_use_payload(&self, invocation: &ToolInvocation) -> Option<PreToolUsePayload> {
let ToolPayload::Function { arguments } = &invocation.payload else {
return None;
@@ -306,6 +311,10 @@ impl CoreToolRuntime for ExposureOverride {
self.handler.post_tool_use_payload(invocation, result)
}
fn usage_contributors(&self) -> Vec<UsageContributor> {
self.handler.usage_contributors()
}
fn with_updated_hook_input(
&self,
invocation: ToolInvocation,

View File

@@ -17,6 +17,7 @@ use codex_tools::ToolCall as ExtensionToolCall;
use codex_tools::ToolExecutor;
use codex_tools::ToolName;
use codex_tools::ToolSpec;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use tokio_util::sync::CancellationToken;
@@ -34,6 +35,9 @@ pub struct ToolCall {
pub struct ToolRouter {
registry: ToolRegistry,
model_visible_specs: Vec<ToolSpec>,
usage_contributors: Vec<crate::usage::UsagePromptContributor>,
usage_contributors_by_tool_name:
HashMap<ToolName, Vec<codex_protocol::protocol::UsageContributor>>,
}
pub(crate) struct ToolRouterParams<'a> {
@@ -49,10 +53,20 @@ impl ToolRouter {
build_tool_router(turn_context, params)
}
pub(crate) fn from_parts(registry: ToolRegistry, model_visible_specs: Vec<ToolSpec>) -> Self {
pub(crate) fn from_parts(
registry: ToolRegistry,
model_visible_specs: Vec<ToolSpec>,
usage_contributors: Vec<crate::usage::UsagePromptContributor>,
usage_contributors_by_tool_name: HashMap<
ToolName,
Vec<codex_protocol::protocol::UsageContributor>,
>,
) -> Self {
Self {
registry,
model_visible_specs,
usage_contributors,
usage_contributors_by_tool_name,
}
}
@@ -60,6 +74,19 @@ impl ToolRouter {
self.model_visible_specs.clone()
}
pub(crate) fn usage_contributors(&self) -> &[crate::usage::UsagePromptContributor] {
&self.usage_contributors
}
pub(crate) fn usage_contributors_for_tool_name(
&self,
tool_name: &ToolName,
) -> &[codex_protocol::protocol::UsageContributor] {
self.usage_contributors_by_tool_name
.get(tool_name)
.map_or(&[], Vec::as_slice)
}
#[cfg(test)]
pub(crate) fn registered_tool_names_for_test(&self) -> Vec<ToolName> {
self.registry.tool_names_for_test()

View File

@@ -62,6 +62,7 @@ use codex_protocol::openai_models::ConfigShellToolType;
use codex_protocol::openai_models::InputModality;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::UsageContributor;
use codex_tools::DiscoverableTool;
use codex_tools::ResponsesApiNamespace;
use codex_tools::ResponsesApiNamespaceTool;
@@ -80,6 +81,7 @@ use codex_tools::request_user_input_available_modes;
use codex_tools::shell_command_backend_for_features;
use codex_tools::shell_type_for_model_and_features;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use tracing::warn;
@@ -146,14 +148,25 @@ pub(crate) fn build_tool_router(
turn_context: &TurnContext,
params: ToolRouterParams<'_>,
) -> ToolRouter {
let (model_visible_specs, registry) = build_tool_specs_and_registry(turn_context, params);
ToolRouter::from_parts(registry, model_visible_specs)
let (model_visible_specs, registry, usage_contributors, usage_contributors_by_tool_name) =
build_tool_specs_and_registry(turn_context, params);
ToolRouter::from_parts(
registry,
model_visible_specs,
usage_contributors,
usage_contributors_by_tool_name,
)
}
fn build_tool_specs_and_registry(
turn_context: &TurnContext,
params: ToolRouterParams<'_>,
) -> (Vec<ToolSpec>, ToolRegistry) {
) -> (
Vec<ToolSpec>,
ToolRegistry,
Vec<crate::usage::UsagePromptContributor>,
HashMap<ToolName, Vec<codex_protocol::protocol::UsageContributor>>,
) {
let ToolRouterParams {
mcp_tools,
deferred_mcp_tools,
@@ -183,12 +196,18 @@ fn build_tool_specs_and_registry(
fn build_model_visible_specs_and_registry(
turn_context: &TurnContext,
planned_tools: PlannedTools,
) -> (Vec<ToolSpec>, ToolRegistry) {
) -> (
Vec<ToolSpec>,
ToolRegistry,
Vec<crate::usage::UsagePromptContributor>,
HashMap<ToolName, Vec<codex_protocol::protocol::UsageContributor>>,
) {
let PlannedTools {
runtimes,
hosted_specs,
} = planned_tools;
let mut specs = Vec::new();
let mut usage_contributors_by_tool_name = HashMap::new();
let mut seen_tool_names = HashSet::new();
for runtime in &runtimes {
let tool_name = runtime.tool_name();
@@ -196,10 +215,18 @@ fn build_model_visible_specs_and_registry(
continue;
}
let exposure = runtime.exposure();
let runtime_usage_contributors = runtime.usage_contributors();
if !runtime_usage_contributors.is_empty() {
usage_contributors_by_tool_name
.insert(tool_name.clone(), runtime_usage_contributors.clone());
}
if exposure.is_direct() && !is_hidden_by_code_mode_only(turn_context, &tool_name, exposure)
{
let spec = runtime.spec();
specs.push(spec_for_model_request(turn_context, exposure, spec));
let spec = spec_for_model_request(turn_context, exposure, runtime.spec());
if !namespace_tools_enabled(turn_context) && matches!(spec, ToolSpec::Namespace(_)) {
continue;
}
specs.push(spec);
}
}
for spec in hosted_specs {
@@ -213,14 +240,65 @@ fn build_model_visible_specs_and_registry(
}
let registry = ToolRegistry::from_tools(runtimes);
let model_visible_specs = merge_into_namespaces(specs)
let model_visible_specs: Vec<ToolSpec> = merge_into_namespaces(specs)
.into_iter()
.filter(|spec| {
namespace_tools_enabled(turn_context) || !matches!(spec, ToolSpec::Namespace(_))
})
.collect();
let usage_contributors =
prompt_usage_contributors(&model_visible_specs, &usage_contributors_by_tool_name);
(model_visible_specs, registry)
(
model_visible_specs,
registry,
usage_contributors,
usage_contributors_by_tool_name,
)
}
fn prompt_usage_contributors(
specs: &[ToolSpec],
contributors_by_tool_name: &HashMap<ToolName, Vec<UsageContributor>>,
) -> Vec<crate::usage::UsagePromptContributor> {
let mut prompt_contributors = Vec::new();
for spec in specs {
let mut spec_contributors = Vec::new();
match spec {
ToolSpec::Namespace(namespace) => {
for tool in &namespace.tools {
let ResponsesApiNamespaceTool::Function(tool) = tool;
let tool_name = ToolName::namespaced(&namespace.name, &tool.name);
if let Some(contributors) = contributors_by_tool_name.get(&tool_name) {
for contributor in contributors {
if !spec_contributors.contains(contributor) {
spec_contributors.push(contributor.clone());
}
}
}
}
}
ToolSpec::Function(_)
| ToolSpec::ToolSearch { .. }
| ToolSpec::ImageGeneration { .. }
| ToolSpec::WebSearch { .. }
| ToolSpec::Freeform(_) => {
if let Some(contributors) =
contributors_by_tool_name.get(&ToolName::plain(spec.name()))
{
spec_contributors.extend(contributors.iter().cloned());
}
}
}
let source_estimated_tokens = crate::usage::estimate_serialized_tokens(spec);
prompt_contributors.extend(spec_contributors.into_iter().map(|contributor| {
crate::usage::UsagePromptContributor {
contributor,
source_estimated_tokens,
}
}));
}
prompt_contributors
}
fn spec_for_model_request(
@@ -392,6 +470,7 @@ fn build_code_mode_executors(
let mut code_mode_nested_tool_specs = Vec::new();
let mut exec_prompt_tool_specs = Vec::new();
let mut exec_prompt_usage_contributors = Vec::new();
for executor in executors {
let exposure = executor.exposure();
if exposure == ToolExposure::DirectModelOnly {
@@ -405,6 +484,11 @@ fn build_code_mode_executors(
if exposure != ToolExposure::Deferred {
exec_prompt_tool_specs.push(spec.clone());
for contributor in executor.usage_contributors() {
if !exec_prompt_usage_contributors.contains(&contributor) {
exec_prompt_usage_contributors.push(contributor);
}
}
}
code_mode_nested_tool_specs.push(spec);
}
@@ -424,6 +508,7 @@ fn build_code_mode_executors(
deferred_tools_available,
),
code_mode_nested_tool_specs,
exec_prompt_usage_contributors,
)),
Arc::new(CodeModeWaitHandler),
]

View File

@@ -48,6 +48,8 @@ struct ToolPlanProbe {
visible_specs: Vec<ToolSpec>,
visible_names: Vec<String>,
namespace_functions: BTreeMap<String, Vec<String>>,
prompt_usage_contributor_labels: Vec<String>,
tool_usage_contributor_labels: BTreeMap<String, Vec<String>>,
registered_names: Vec<String>,
exposures: BTreeMap<String, ToolExposure>,
}
@@ -84,6 +86,26 @@ impl ToolPlanProbe {
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let prompt_usage_contributor_labels = router
.usage_contributors()
.iter()
.map(|contributor| contributor.contributor.label.clone())
.collect::<Vec<_>>();
let tool_usage_contributor_labels = registered_tool_names
.iter()
.filter_map(|name| {
let contributors = router.usage_contributors_for_tool_name(name);
(!contributors.is_empty()).then(|| {
(
name.to_string(),
contributors
.iter()
.map(|contributor| contributor.label.clone())
.collect::<Vec<_>>(),
)
})
})
.collect::<BTreeMap<_, _>>();
let exposures = registered_tool_names
.iter()
.filter_map(|name| {
@@ -97,6 +119,8 @@ impl ToolPlanProbe {
visible_specs,
visible_names,
namespace_functions,
prompt_usage_contributor_labels,
tool_usage_contributor_labels,
registered_names,
exposures,
}
@@ -153,6 +177,12 @@ impl ToolPlanProbe {
.map_or(&[], Vec::as_slice)
}
fn tool_usage_contributor_labels(&self, tool_name: &str) -> &[String] {
self.tool_usage_contributor_labels
.get(tool_name)
.map_or(&[], Vec::as_slice)
}
fn visible_spec(&self, name: &str) -> &ToolSpec {
self.visible_specs
.iter()
@@ -490,6 +520,26 @@ async fn mcp_and_tool_search_follow_direct_and_deferred_tool_exposure() {
direct_mcp.namespace_function_names("mcp__direct"),
&["lookup".to_string()]
);
assert_eq!(
direct_mcp.prompt_usage_contributor_labels,
vec!["direct".to_string()]
);
let merged_mcp = probe_with(
|_| {},
ToolPlanInputs {
mcp_tools: Some(vec![
mcp_tool("merged", "mcp__merged", "lookup"),
mcp_tool("merged", "mcp__merged", "search"),
]),
..ToolPlanInputs::default()
},
)
.await;
assert_eq!(
merged_mcp.prompt_usage_contributor_labels,
vec!["merged".to_string()]
);
let searchable_mcp = ToolPlanInputs {
deferred_mcp_tools: Some(vec![mcp_tool("searchable", "mcp__searchable", "lookup")]),
@@ -533,6 +583,22 @@ async fn mcp_and_tool_search_follow_direct_and_deferred_tool_exposure() {
.await;
bedrock_namespace_capability.assert_visible_contains(&["tool_search"]);
let bedrock_namespace_spec = probe_with(
|turn| {
use_bedrock_provider(turn);
},
ToolPlanInputs {
mcp_tools: Some(vec![mcp_tool("bedrock", "mcp__bedrock__", "lookup")]),
..ToolPlanInputs::default()
},
)
.await;
bedrock_namespace_spec.assert_visible_contains(&["mcp__bedrock__"]);
assert_eq!(
bedrock_namespace_spec.prompt_usage_contributor_labels,
vec!["bedrock".to_string()]
);
let enabled = probe_with(
|turn| {
turn.model_info.supports_search_tool = true;
@@ -541,10 +607,12 @@ async fn mcp_and_tool_search_follow_direct_and_deferred_tool_exposure() {
)
.await;
enabled.assert_visible_contains(&["tool_search"]);
enabled.assert_registered_contains(&[
"tool_search",
&ToolName::namespaced("mcp__searchable", "lookup").to_string(),
]);
let searchable_tool_name = ToolName::namespaced("mcp__searchable", "lookup").to_string();
enabled.assert_registered_contains(&["tool_search", searchable_tool_name.as_str()]);
assert_eq!(
enabled.tool_usage_contributor_labels(&searchable_tool_name),
["searchable".to_string()].as_slice()
);
}
#[tokio::test]
@@ -706,6 +774,7 @@ async fn code_mode_only_exposes_code_executor_and_hides_nested_tools() {
set_features(turn, &[Feature::CodeMode, Feature::CodeModeOnly]);
},
ToolPlanInputs {
mcp_tools: Some(vec![mcp_tool("nested", "mcp__nested", "lookup")]),
dynamic_tools: vec![dynamic_tool(
Some("codex_app"),
"lookup",
@@ -723,6 +792,10 @@ async fn code_mode_only_exposes_code_executor_and_hides_nested_tools() {
code_mode_only.namespace_function_names("codex_app"),
Vec::<String>::new().as_slice()
);
assert_eq!(
code_mode_only.prompt_usage_contributor_labels,
vec!["nested".to_string()]
);
}
#[tokio::test]

355
codex-rs/core/src/usage.rs Normal file
View File

@@ -0,0 +1,355 @@
use crate::tools::router::ToolRouter;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::UsageAttributionContributor;
use codex_protocol::protocol::UsageAttributionItem;
use codex_protocol::protocol::UsageContributor;
use codex_protocol::protocol::UsageContributorKind;
use codex_tools::ToolName;
use codex_tools::ToolSpec;
use codex_utils_output_truncation::approx_token_count;
use std::collections::BTreeMap;
use std::collections::HashMap;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub(crate) struct UsagePromptAttribution {
pub(crate) prompt_estimated_tokens: i64,
pub(crate) contributors: Vec<UsagePromptContributor>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct UsagePromptContributor {
pub(crate) contributor: UsageContributor,
pub(crate) source_estimated_tokens: i64,
}
impl UsagePromptAttribution {
pub(crate) fn from_prompt(
input: &[ResponseItem],
tools: &[ToolSpec],
router: &ToolRouter,
base_instructions: &str,
) -> Self {
let mut contributors = skill_contributors(input);
contributors.extend_from_slice(router.usage_contributors());
contributors.extend(tool_result_contributors(input, router));
let input_tokens = input
.iter()
.map(estimate_response_item_tokens)
.fold(0i64, i64::saturating_add);
let tool_tokens = tools
.iter()
.map(estimate_serialized_tokens)
.fold(0i64, i64::saturating_add);
let base_tokens = i64::try_from(approx_token_count(base_instructions)).unwrap_or(i64::MAX);
Self {
prompt_estimated_tokens: base_tokens
.saturating_add(input_tokens)
.saturating_add(tool_tokens),
contributors: aggregate_contributors(contributors),
}
}
pub(crate) fn complete(
&self,
sample_id: String,
turn_id: String,
response_id: String,
occurred_at: i64,
token_usage: TokenUsage,
) -> UsageAttributionItem {
let non_cached_input = token_usage.non_cached_input();
let contributors = self
.contributors
.iter()
.map(|contributor| UsageAttributionContributor {
contributor: contributor.contributor.clone(),
source_estimated_tokens: contributor.source_estimated_tokens,
attributed_tokens: attributable_tokens(
non_cached_input,
contributor.source_estimated_tokens,
self.prompt_estimated_tokens,
),
})
.filter(|contributor| contributor.attributed_tokens > 0)
.collect();
UsageAttributionItem {
sample_id,
turn_id,
response_id,
occurred_at,
token_usage,
prompt_estimated_tokens: self.prompt_estimated_tokens,
contributors,
}
}
}
pub(crate) fn estimate_serialized_tokens<T: serde::Serialize>(value: &T) -> i64 {
serde_json::to_string(value)
.map(|serialized| i64::try_from(approx_token_count(&serialized)).unwrap_or(i64::MAX))
.unwrap_or(/*default*/ 0)
}
fn estimate_response_item_tokens(item: &ResponseItem) -> i64 {
estimate_serialized_tokens(item)
}
fn skill_contributors(input: &[ResponseItem]) -> Vec<UsagePromptContributor> {
input.iter().filter_map(skill_contributor).collect()
}
fn tool_result_contributors(
input: &[ResponseItem],
router: &ToolRouter,
) -> Vec<UsagePromptContributor> {
let contributors_by_call_id = input
.iter()
.filter_map(|item| {
let (call_id, tool_name) = match item {
ResponseItem::FunctionCall {
call_id,
name,
namespace,
..
} => (call_id, ToolName::new(namespace.clone(), name)),
ResponseItem::CustomToolCall { call_id, name, .. } => {
(call_id, ToolName::plain(name))
}
_ => return None,
};
let contributors = router.usage_contributors_for_tool_name(&tool_name);
(!contributors.is_empty()).then(|| (call_id.clone(), contributors))
})
.collect::<HashMap<_, _>>();
input
.iter()
.filter_map(|item| {
let call_id = match item {
ResponseItem::FunctionCallOutput { call_id, .. }
| ResponseItem::CustomToolCallOutput { call_id, .. } => call_id,
_ => return None,
};
let source_estimated_tokens = estimate_response_item_tokens(item);
Some(
contributors_by_call_id
.get(call_id)?
.iter()
.cloned()
.map(move |contributor| UsagePromptContributor {
contributor,
source_estimated_tokens,
}),
)
})
.flatten()
.collect()
}
fn skill_contributor(item: &ResponseItem) -> Option<UsagePromptContributor> {
let ResponseItem::Message { content, .. } = item else {
return None;
};
let text = content.iter().find_map(|content| match content {
ContentItem::InputText { text } if text.contains("<skill>") => Some(text.as_str()),
_ => None,
})?;
let name = tag_contents(text, "name")?;
let path = tag_contents(text, "path")?;
Some(UsagePromptContributor {
contributor: UsageContributor {
kind: UsageContributorKind::Skill,
id: path.to_string(),
label: name.to_string(),
},
source_estimated_tokens: i64::try_from(approx_token_count(text)).unwrap_or(i64::MAX),
})
}
fn tag_contents<'a>(text: &'a str, tag: &str) -> Option<&'a str> {
let open = format!("<{tag}>");
let close = format!("</{tag}>");
let start = text.find(open.as_str())? + open.len();
let end = text[start..].find(close.as_str())? + start;
Some(text[start..end].trim())
}
fn aggregate_contributors(
contributors: Vec<UsagePromptContributor>,
) -> Vec<UsagePromptContributor> {
let mut aggregated = BTreeMap::new();
for contributor in contributors {
let key = (
contributor.contributor.kind as u8,
contributor.contributor.id.clone(),
contributor.contributor.label.clone(),
);
aggregated
.entry(key)
.and_modify(|existing: &mut UsagePromptContributor| {
existing.source_estimated_tokens = existing
.source_estimated_tokens
.saturating_add(contributor.source_estimated_tokens);
})
.or_insert(contributor);
}
aggregated.into_values().collect()
}
fn attributable_tokens(non_cached_input: i64, source_tokens: i64, prompt_tokens: i64) -> i64 {
if non_cached_input <= 0 || source_tokens <= 0 || prompt_tokens <= 0 {
return 0;
}
non_cached_input
.saturating_mul(source_tokens)
.saturating_add(prompt_tokens / 2)
/ prompt_tokens
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tools::registry::ToolRegistry;
use codex_protocol::models::FunctionCallOutputPayload;
use pretty_assertions::assert_eq;
#[test]
fn complete_attributes_only_non_cached_input_tokens() {
let attribution = UsagePromptAttribution {
prompt_estimated_tokens: 100,
contributors: vec![
usage_prompt_contributor(
UsageContributorKind::Skill,
"/skills/tmux",
"tmux",
/*source_estimated_tokens*/ 25,
),
usage_prompt_contributor(
UsageContributorKind::App,
"slack",
"Slack",
/*source_estimated_tokens*/ 10,
),
],
};
let usage = attribution.complete(
"sample".to_string(),
"turn".to_string(),
"response".to_string(),
/*occurred_at*/ 1_700_000_000,
TokenUsage {
input_tokens: 100,
cached_input_tokens: 40,
output_tokens: 20,
reasoning_output_tokens: 0,
total_tokens: 120,
},
);
assert_eq!(
usage.contributors,
vec![
UsageAttributionContributor {
contributor: usage_contributor(
UsageContributorKind::Skill,
"/skills/tmux",
"tmux",
),
source_estimated_tokens: 25,
attributed_tokens: 15,
},
UsageAttributionContributor {
contributor: usage_contributor(UsageContributorKind::App, "slack", "Slack"),
source_estimated_tokens: 10,
attributed_tokens: 6,
},
]
);
}
#[test]
fn skill_contributors_use_skill_path_as_stable_id() {
let item = ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "<skill><name>tmux</name><path>/skills/tmux/SKILL.md</path></skill>"
.to_string(),
}],
phase: None,
};
assert_eq!(
skill_contributors(&[item]),
vec![UsagePromptContributor {
contributor: usage_contributor(
UsageContributorKind::Skill,
"/skills/tmux/SKILL.md",
"tmux",
),
source_estimated_tokens: i64::try_from(approx_token_count(
"<skill><name>tmux</name><path>/skills/tmux/SKILL.md</path></skill>",
))
.expect("skill prompt token estimate should fit in i64"),
}]
);
}
#[test]
fn tool_results_reuse_tool_usage_provenance() {
let contributor = usage_contributor(UsageContributorKind::App, "slack", "Slack");
let tool_name = ToolName::plain("mcp__slack__search");
let router = ToolRouter::from_parts(
ToolRegistry::from_tools(Vec::<
std::sync::Arc<dyn crate::tools::registry::CoreToolRuntime>,
>::new()),
Vec::new(),
Vec::new(),
HashMap::from([(tool_name.clone(), vec![contributor.clone()])]),
);
let tool_result = ResponseItem::FunctionCallOutput {
call_id: "call-1".to_string(),
output: FunctionCallOutputPayload::from_text("result".to_string()),
};
let input = vec![
ResponseItem::FunctionCall {
id: None,
name: tool_name.name,
namespace: tool_name.namespace,
arguments: "{}".to_string(),
call_id: "call-1".to_string(),
},
tool_result.clone(),
];
assert_eq!(
tool_result_contributors(&input, &router),
vec![UsagePromptContributor {
contributor,
source_estimated_tokens: estimate_response_item_tokens(&tool_result),
}]
);
}
fn usage_prompt_contributor(
kind: UsageContributorKind,
id: &str,
label: &str,
source_estimated_tokens: i64,
) -> UsagePromptContributor {
UsagePromptContributor {
contributor: usage_contributor(kind, id, label),
source_estimated_tokens,
}
}
fn usage_contributor(kind: UsageContributorKind, id: &str, label: &str) -> UsageContributor {
UsageContributor {
kind,
id: id.to_string(),
label: label.to_string(),
}
}
}

View File

@@ -664,6 +664,27 @@ pub fn ev_completed_with_tokens(id: &str, total_tokens: i64) -> Value {
})
}
pub fn ev_completed_with_usage(
id: &str,
input_tokens: i64,
cached_input_tokens: i64,
output_tokens: i64,
) -> Value {
serde_json::json!({
"type": "response.completed",
"response": {
"id": id,
"usage": {
"input_tokens": input_tokens,
"input_tokens_details": { "cached_tokens": cached_input_tokens },
"output_tokens": output_tokens,
"output_tokens_details": null,
"total_tokens": input_tokens + output_tokens
}
}
})
}
/// Convenience: SSE event for a single assistant message output item.
pub fn ev_assistant_message(id: &str, text: &str) -> Value {
serde_json::json!({

View File

@@ -1,6 +1,8 @@
use anyhow::Result;
use codex_config::types::McpServerConfig;
use codex_config::types::McpServerTransportConfig;
use codex_exec_server::CreateDirectoryOptions;
use codex_exec_server::ExecutorFileSystem;
use codex_features::Feature;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
@@ -15,6 +17,7 @@ use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::UserMessageEvent;
use codex_protocol::user_input::UserInput;
use codex_utils_absolute_path::AbsolutePathBuf;
use core_test_support::responses;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
@@ -33,10 +36,31 @@ use pretty_assertions::assert_eq;
use serde_json::json;
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tokio::time::Duration;
use tracing_subscriber::prelude::*;
use uuid::Uuid;
async fn write_usage_skill(cwd: AbsolutePathBuf, fs: Arc<dyn ExecutorFileSystem>) -> Result<()> {
let skill_dir = cwd.join(".agents").join("skills").join("usage-e2e");
fs.create_directory(
&skill_dir,
CreateDirectoryOptions { recursive: true },
/*sandbox*/ None,
)
.await?;
fs.write_file(
&skill_dir.join("SKILL.md"),
b"---\nname: usage-e2e\ndescription: usage metric test\n---\n\nUse the rmcp echo tool.\n"
.to_vec(),
/*sandbox*/ None,
)
.await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn new_thread_is_recorded_in_state_db() -> Result<()> {
let server = start_mock_server().await;
@@ -516,6 +540,164 @@ async fn mcp_call_marks_thread_memory_mode_polluted_when_configured() -> Result<
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn usage_records_blended_tokens_for_skill_and_mcp_context() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let call_id = "call-usage";
let server_name = "rmcp";
let namespace = format!("mcp__{server_name}");
mount_sse_once(
&server,
responses::sse(vec![
ev_response_created("resp-usage-1"),
responses::ev_function_call_with_namespace(
call_id,
&namespace,
"echo",
"{\"message\":\"ping\"}",
),
responses::ev_completed_with_usage(
"resp-usage-1",
/*input_tokens*/ 1_200,
/*cached_input_tokens*/ 200,
/*output_tokens*/ 100,
),
]),
)
.await;
mount_sse_once(
&server,
responses::sse(vec![
responses::ev_assistant_message("msg-usage", "rmcp echo tool completed."),
responses::ev_completed_with_usage(
"resp-usage-2",
/*input_tokens*/ 1_400,
/*cached_input_tokens*/ 1_000,
/*output_tokens*/ 80,
),
]),
)
.await;
let rmcp_test_server_bin = stdio_server_bin()?;
let mut builder = test_codex()
.with_workspace_setup(|cwd, fs| async move { write_usage_skill(cwd, fs).await })
.with_config(move |config| {
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow feature update");
let mut servers = config.mcp_servers.get().clone();
servers.insert(
server_name.to_string(),
McpServerConfig {
transport: McpServerTransportConfig::Stdio {
command: rmcp_test_server_bin,
args: Vec::new(),
env: Some(HashMap::from([(
"MCP_TEST_VALUE".to_string(),
"propagated-env".to_string(),
)])),
env_vars: Vec::new(),
cwd: None,
},
environment_id: "local".to_string(),
enabled: true,
required: false,
supports_parallel_tool_calls: false,
disabled_reason: None,
startup_timeout_sec: Some(Duration::from_secs(10)),
tool_timeout_sec: None,
default_tools_approval_mode: None,
enabled_tools: None,
disabled_tools: None,
scopes: None,
oauth: None,
oauth_resource: None,
tools: HashMap::new(),
},
);
config
.mcp_servers
.set(servers)
.expect("test mcp servers should accept any configuration");
});
let test = builder.build(&server).await?;
let db = test.codex.state_db().expect("state db enabled");
let cwd = test.cwd_path().to_path_buf();
let skill_path = cwd
.join(".agents/skills/usage-e2e/SKILL.md")
.canonicalize()
.unwrap_or_else(|_| cwd.join(".agents/skills/usage-e2e/SKILL.md"))
.to_path_buf();
let (sandbox_policy, permission_profile) =
turn_permission_fields(PermissionProfile::read_only(), cwd.as_path());
test.codex
.submit(Op::UserInput {
items: vec![
UserInput::Text {
text: "use $usage-e2e and call the rmcp echo tool".to_string(),
text_elements: Vec::new(),
},
UserInput::Skill {
name: "usage-e2e".to_string(),
path: skill_path.clone(),
},
],
environments: None,
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: codex_protocol::protocol::ThreadSettingsOverrides {
cwd: Some(cwd),
approval_policy: Some(AskForApproval::Never),
sandbox_policy: Some(sandbox_policy),
permission_profile,
collaboration_mode: Some(codex_protocol::config_types::CollaborationMode {
mode: codex_protocol::config_types::ModeKind::Default,
settings: codex_protocol::config_types::Settings {
model: test.session_configured.model.clone(),
reasoning_effort: None,
developer_instructions: None,
},
}),
..Default::default()
},
})
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::McpToolCallEnd(_))
})
.await;
wait_for_event_match(&test.codex, |event| match event {
EventMsg::Error(err) => Some(Err(anyhow::anyhow!(err.message.clone()))),
EventMsg::TurnComplete(_) => Some(Ok(())),
_ => None,
})
.await?;
let now = i64::try_from(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())? + 1;
let report = db
.read_usage_report(codex_state::UsageRange::Day, now)
.await?;
assert_eq!(report.total_tokens, 1_580);
assert!(report.tracked_from.is_some());
assert_eq!(report.skills.len(), 1);
assert_eq!(report.skills[0].label, "usage-e2e");
assert_eq!(report.skills[0].id, skill_path.to_string_lossy());
assert!(report.skills[0].attributed_tokens > 0);
assert_eq!(report.mcp_servers.len(), 1);
assert_eq!(report.mcp_servers[0].label, server_name);
assert!(report.mcp_servers[0].attributed_tokens > 0);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn tool_call_logs_include_thread_id() -> Result<()> {
let server = start_mock_server().await;

View File

@@ -1925,6 +1925,47 @@ pub struct TokenUsage {
pub total_tokens: i64,
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Hash, JsonSchema, TS)]
#[serde(rename_all = "snake_case")]
#[ts(rename_all = "snake_case")]
pub enum UsageContributorKind {
Skill,
Subagent,
AgentTask,
App,
McpServer,
Plugin,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
pub struct UsageContributor {
pub kind: UsageContributorKind,
pub id: String,
pub label: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
pub struct UsageAttributionContributor {
pub contributor: UsageContributor,
#[ts(type = "number")]
pub source_estimated_tokens: i64,
#[ts(type = "number")]
pub attributed_tokens: i64,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
pub struct UsageAttributionItem {
pub sample_id: String,
pub turn_id: String,
pub response_id: String,
#[ts(type = "number")]
pub occurred_at: i64,
pub token_usage: TokenUsage,
#[ts(type = "number")]
pub prompt_estimated_tokens: i64,
pub contributors: Vec<UsageAttributionContributor>,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
pub struct TokenUsageInfo {
pub total_token_usage: TokenUsage,

View File

@@ -0,0 +1,29 @@
CREATE TABLE usage_samples (
sample_id TEXT PRIMARY KEY,
thread_id TEXT NOT NULL REFERENCES threads(id) ON DELETE CASCADE,
turn_id TEXT NOT NULL,
response_id TEXT NOT NULL,
occurred_at INTEGER NOT NULL,
input_tokens INTEGER NOT NULL,
cached_input_tokens INTEGER NOT NULL,
non_cached_input_tokens INTEGER NOT NULL,
output_tokens INTEGER NOT NULL,
reasoning_output_tokens INTEGER NOT NULL,
total_tokens INTEGER NOT NULL,
blended_tokens INTEGER NOT NULL,
prompt_estimated_tokens INTEGER NOT NULL
);
CREATE TABLE usage_sample_contributors (
sample_id TEXT NOT NULL REFERENCES usage_samples(sample_id) ON DELETE CASCADE,
kind TEXT NOT NULL,
contributor_id TEXT NOT NULL,
label TEXT NOT NULL,
source_estimated_tokens INTEGER NOT NULL,
attributed_tokens INTEGER NOT NULL,
PRIMARY KEY (sample_id, kind, contributor_id)
);
CREATE INDEX idx_usage_samples_occurred_at ON usage_samples(occurred_at);
CREATE INDEX idx_usage_samples_thread_occurred_at ON usage_samples(thread_id, occurred_at);
CREATE INDEX idx_usage_sample_contributors_kind ON usage_sample_contributors(kind, contributor_id);

View File

@@ -51,6 +51,11 @@ pub use model::ThreadGoalStatus;
pub use model::ThreadMetadata;
pub use model::ThreadMetadataBuilder;
pub use model::ThreadsPage;
pub use model::UsageEntry;
pub use model::UsageHeadline;
pub use model::UsageRange;
pub use model::UsageReport;
pub use model::UsageSample;
pub use runtime::GoalAccountingMode;
pub use runtime::GoalAccountingOutcome;
pub use runtime::GoalStore;

View File

@@ -5,6 +5,7 @@ mod log;
mod memories;
mod thread_goal;
mod thread_metadata;
mod usage;
pub use agent_job::AgentJob;
pub use agent_job::AgentJobCreateParams;
@@ -34,6 +35,11 @@ pub use thread_metadata::SortKey;
pub use thread_metadata::ThreadMetadata;
pub use thread_metadata::ThreadMetadataBuilder;
pub use thread_metadata::ThreadsPage;
pub use usage::UsageEntry;
pub use usage::UsageHeadline;
pub use usage::UsageRange;
pub use usage::UsageReport;
pub use usage::UsageSample;
pub(crate) use agent_job::AgentJobItemRow;
pub(crate) use agent_job::AgentJobRow;

View File

@@ -0,0 +1,53 @@
use codex_protocol::protocol::UsageAttributionItem;
use codex_protocol::protocol::UsageContributorKind;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UsageRange {
Day,
Week,
}
impl UsageRange {
pub(crate) fn seconds(self) -> i64 {
match self {
Self::Day => 24 * 60 * 60,
Self::Week => 7 * 24 * 60 * 60,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UsageEntry {
pub kind: UsageContributorKind,
pub id: String,
pub label: String,
pub attributed_tokens: i64,
pub percent_of_usage: u8,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UsageHeadline {
pub entry: UsageEntry,
pub note: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UsageReport {
pub range: UsageRange,
pub generated_at: i64,
pub tracked_from: Option<i64>,
pub total_tokens: i64,
pub headline: Option<UsageHeadline>,
pub skills: Vec<UsageEntry>,
pub subagents: Vec<UsageEntry>,
pub agent_tasks: Vec<UsageEntry>,
pub apps: Vec<UsageEntry>,
pub mcp_servers: Vec<UsageEntry>,
pub plugins: Vec<UsageEntry>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UsageSample {
pub thread_id: codex_protocol::ThreadId,
pub attribution: UsageAttributionItem,
}

View File

@@ -66,6 +66,7 @@ mod remote_control;
#[cfg(test)]
mod test_support;
mod threads;
mod usage;
pub use goals::GoalAccountingMode;
pub use goals::GoalAccountingOutcome;
@@ -267,6 +268,12 @@ impl StateRuntime {
logs_path.display(),
);
}
if let Err(err) = runtime.run_usage_startup_maintenance().await {
warn!(
"failed to run startup maintenance for usage data in state db at {}: {err}",
state_path.display(),
);
}
Ok(runtime)
}

File diff suppressed because it is too large Load Diff