[app-server] add thread/tokenUsage/updated v2 event (#7268)

the TokenEvent event message becomes `thread/tokenUsage/updated` in v2.
before & after:
```
< {
<   "method": "codex/event/token_count",
<   "params": {
<     "conversationId": "019ab891-4c55-7790-9670-6c3b48c33281",
<     "id": "1",
<     "msg": {
<       "info": {
<         "last_token_usage": {
<           "cached_input_tokens": 3072,
<           "input_tokens": 5152,
<           "output_tokens": 16,
<           "reasoning_output_tokens": 0,
<           "total_tokens": 5168
<         },
<         "model_context_window": 258400,
<         "total_token_usage": {
<           "cached_input_tokens": 3072,
<           "input_tokens": 5152,
<           "output_tokens": 16,
<           "reasoning_output_tokens": 0,
<           "total_tokens": 5168
<         }
<       },
<       "rate_limits": {
<         "credits": null,
<         "primary": null,
<         "secondary": null
<       },
<       "type": "token_count"
<     }
<   }
< }
< {
<   "method": "thread/tokenUsage/updated",
<   "params": {
<     "threadId": "019ab891-4c55-7790-9670-6c3b48c33281",
<     "tokenUsage": {
<       "last": {
<         "cachedInputTokens": 3072,
<         "inputTokens": 5152,
<         "outputTokens": 16,
<         "reasoningOutputTokens": 0,
<         "totalTokens": 5168
<       },
<       "modelContextWindow": 258400,
<       "total": {
<         "cachedInputTokens": 3072,
<         "inputTokens": 5152,
<         "outputTokens": 16,
<         "reasoningOutputTokens": 0,
<         "totalTokens": 5168
<       }
<     },
<     "turnId": "1"
<   }
< }
```
This commit is contained in:
Celia Chen
2025-11-25 11:56:04 -08:00
committed by GitHub
parent 865e225bde
commit 401f94ca31
4 changed files with 287 additions and 64 deletions

View File

@@ -502,6 +502,7 @@ server_notification_definitions! {
/// NEW NOTIFICATIONS
Error => "error" (v2::ErrorNotification),
ThreadStarted => "thread/started" (v2::ThreadStartedNotification),
ThreadTokenUsageUpdated => "thread/tokenUsage/updated" (v2::ThreadTokenUsageUpdatedNotification),
TurnStarted => "turn/started" (v2::TurnStartedNotification),
TurnCompleted => "turn/completed" (v2::TurnCompletedNotification),
TurnDiffUpdated => "turn/diff/updated" (v2::TurnDiffUpdatedNotification),

View File

@@ -16,6 +16,8 @@ use codex_protocol::protocol::CreditsSnapshot as CoreCreditsSnapshot;
use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot;
use codex_protocol::protocol::RateLimitWindow as CoreRateLimitWindow;
use codex_protocol::protocol::SessionSource as CoreSessionSource;
use codex_protocol::protocol::TokenUsage as CoreTokenUsage;
use codex_protocol::protocol::TokenUsageInfo as CoreTokenUsageInfo;
use codex_protocol::user_input::UserInput as CoreUserInput;
use mcp_types::ContentBlock as McpContentBlock;
use schemars::JsonSchema;
@@ -780,6 +782,63 @@ pub struct AccountUpdatedNotification {
pub auth_mode: Option<AuthMode>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadTokenUsageUpdatedNotification {
pub thread_id: String,
pub turn_id: String,
pub token_usage: ThreadTokenUsage,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadTokenUsage {
pub total: TokenUsageBreakdown,
pub last: TokenUsageBreakdown,
#[ts(type = "number | null")]
pub model_context_window: Option<i64>,
}
impl From<CoreTokenUsageInfo> for ThreadTokenUsage {
fn from(value: CoreTokenUsageInfo) -> Self {
Self {
total: value.total_token_usage.into(),
last: value.last_token_usage.into(),
model_context_window: value.model_context_window,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct TokenUsageBreakdown {
#[ts(type = "number")]
pub total_tokens: i64,
#[ts(type = "number")]
pub input_tokens: i64,
#[ts(type = "number")]
pub cached_input_tokens: i64,
#[ts(type = "number")]
pub output_tokens: i64,
#[ts(type = "number")]
pub reasoning_output_tokens: i64,
}
impl From<CoreTokenUsage> for TokenUsageBreakdown {
fn from(value: CoreTokenUsage) -> Self {
Self {
total_tokens: value.total_tokens,
input_tokens: value.input_tokens,
cached_input_tokens: value.cached_input_tokens,
output_tokens: value.output_tokens,
reasoning_output_tokens: value.reasoning_output_tokens,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -240,7 +240,7 @@ Event notifications are the server-initiated event stream for thread lifecycles,
### Turn events
The app-server streams JSON-RPC notifications while a turn is running. Each turn starts with `turn/started` (initial `turn`) and ends with `turn/completed` (final `turn` plus token `usage`), and clients subscribe to the events they care about, rendering each item incrementally as updates arrive. The per-item lifecycle is always: `item/started` → zero or more item-specific deltas → `item/completed`.
The app-server streams JSON-RPC notifications while a turn is running. Each turn starts with `turn/started` (initial `turn`) and ends with `turn/completed` (final `turn` status). Token usage events stream separately via `thread/tokenUsage/updated`. Clients subscribe to the events they care about, rendering each item incrementally as updates arrive. The per-item lifecycle is always: `item/started` → zero or more item-specific deltas → `item/completed`.
- `turn/started``{ turn }` with the turn id, empty `items`, and `status: "inProgress"`.
- `turn/completed``{ turn }` where `turn.status` is `completed`, `interrupted`, or `failed`; failures carry `{ error: { message, codexErrorInfo? } }`.

View File

@@ -36,6 +36,8 @@ use codex_app_server_protocol::SandboxCommandAssessment as V2SandboxCommandAsses
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadTokenUsage;
use codex_app_server_protocol::ThreadTokenUsageUpdatedNotification;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnDiffUpdatedNotification;
@@ -54,6 +56,7 @@ use codex_core::protocol::McpToolCallBeginEvent;
use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::Op;
use codex_core::protocol::ReviewDecision;
use codex_core::protocol::TokenCountEvent;
use codex_core::protocol::TurnDiffEvent;
use codex_core::review_format::format_review_findings_block;
use codex_protocol::ConversationId;
@@ -76,10 +79,19 @@ pub(crate) async fn apply_bespoke_event_handling(
turn_summary_store: TurnSummaryStore,
api_version: ApiVersion,
) {
let Event { id: event_id, msg } = event;
let Event {
id: event_turn_id,
msg,
} = event;
match msg {
EventMsg::TaskComplete(_ev) => {
handle_turn_complete(conversation_id, event_id, &outgoing, &turn_summary_store).await;
handle_turn_complete(
conversation_id,
event_turn_id,
&outgoing,
&turn_summary_store,
)
.await;
}
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id,
@@ -100,7 +112,7 @@ pub(crate) async fn apply_bespoke_event_handling(
.send_request(ServerRequestPayload::ApplyPatchApproval(params))
.await;
tokio::spawn(async move {
on_patch_approval_response(event_id, rx, conversation).await;
on_patch_approval_response(event_turn_id, rx, conversation).await;
});
}
ApiVersion::V2 => {
@@ -122,7 +134,7 @@ pub(crate) async fn apply_bespoke_event_handling(
};
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
turn_id: event_turn_id.clone(),
item,
};
outgoing
@@ -142,7 +154,7 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
tokio::spawn(async move {
on_file_change_request_approval_response(
event_id,
event_turn_id,
conversation_id,
item_id,
patch_changes,
@@ -178,7 +190,7 @@ pub(crate) async fn apply_bespoke_event_handling(
.send_request(ServerRequestPayload::ExecCommandApproval(params))
.await;
tokio::spawn(async move {
on_exec_approval_response(event_id, rx, conversation).await;
on_exec_approval_response(event_turn_id, rx, conversation).await;
});
}
ApiVersion::V2 => {
@@ -206,7 +218,7 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
tokio::spawn(async move {
on_command_execution_request_approval_response(
event_id,
event_turn_id,
conversation_id,
item_id,
command_string,
@@ -225,7 +237,7 @@ pub(crate) async fn apply_bespoke_event_handling(
let notification = construct_mcp_tool_call_notification(
begin_event,
conversation_id.to_string(),
event_id.clone(),
event_turn_id.clone(),
)
.await;
outgoing
@@ -236,7 +248,7 @@ pub(crate) async fn apply_bespoke_event_handling(
let notification = construct_mcp_tool_call_end_notification(
end_event,
conversation_id.to_string(),
event_id.clone(),
event_turn_id.clone(),
)
.await;
outgoing
@@ -255,7 +267,7 @@ pub(crate) async fn apply_bespoke_event_handling(
EventMsg::ContextCompacted(..) => {
let notification = ContextCompactedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
turn_id: event_turn_id.clone(),
};
outgoing
.send_server_notification(ServerNotification::ContextCompacted(notification))
@@ -295,15 +307,8 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
}
EventMsg::TokenCount(token_count_event) => {
if let Some(rate_limits) = token_count_event.rate_limits {
outgoing
.send_server_notification(ServerNotification::AccountRateLimitsUpdated(
AccountRateLimitsUpdatedNotification {
rate_limits: rate_limits.into(),
},
))
.await;
}
handle_token_count_event(conversation_id, event_turn_id, token_count_event, &outgoing)
.await;
}
EventMsg::Error(ev) => {
let turn_error = TurnError {
@@ -315,7 +320,7 @@ pub(crate) async fn apply_bespoke_event_handling(
.send_server_notification(ServerNotification::Error(ErrorNotification {
error: turn_error,
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
turn_id: event_turn_id.clone(),
}))
.await;
}
@@ -330,16 +335,16 @@ pub(crate) async fn apply_bespoke_event_handling(
.send_server_notification(ServerNotification::Error(ErrorNotification {
error: turn_error,
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
turn_id: event_turn_id.clone(),
}))
.await;
}
EventMsg::EnteredReviewMode(review_request) => {
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
turn_id: event_turn_id.clone(),
item: ThreadItem::CodeReview {
id: event_id.clone(),
id: event_turn_id.clone(),
review: review_request.user_facing_hint,
},
};
@@ -351,7 +356,7 @@ pub(crate) async fn apply_bespoke_event_handling(
let item: ThreadItem = item_started_event.item.clone().into();
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
turn_id: event_turn_id.clone(),
item,
};
outgoing
@@ -362,7 +367,7 @@ pub(crate) async fn apply_bespoke_event_handling(
let item: ThreadItem = item_completed_event.item.clone().into();
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
turn_id: event_turn_id.clone(),
item,
};
outgoing
@@ -374,10 +379,10 @@ pub(crate) async fn apply_bespoke_event_handling(
Some(output) => render_review_output_text(&output),
None => REVIEW_FALLBACK_MESSAGE.to_string(),
};
let review_item_id = event_id.clone();
let review_item_id = event_turn_id.clone();
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
turn_id: event_turn_id.clone(),
item: ThreadItem::CodeReview {
id: review_item_id,
review: review_text,
@@ -405,7 +410,7 @@ pub(crate) async fn apply_bespoke_event_handling(
};
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
turn_id: event_turn_id.clone(),
item,
};
outgoing
@@ -429,7 +434,7 @@ pub(crate) async fn apply_bespoke_event_handling(
item_id,
changes,
status,
event_id.clone(),
event_turn_id.clone(),
outgoing.as_ref(),
&turn_summary_store,
)
@@ -457,7 +462,7 @@ pub(crate) async fn apply_bespoke_event_handling(
};
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
turn_id: event_turn_id.clone(),
item,
};
outgoing
@@ -518,7 +523,7 @@ pub(crate) async fn apply_bespoke_event_handling(
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
turn_id: event_turn_id.clone(),
item,
};
outgoing
@@ -548,11 +553,22 @@ pub(crate) async fn apply_bespoke_event_handling(
}
}
handle_turn_interrupted(conversation_id, event_id, &outgoing, &turn_summary_store)
.await;
handle_turn_interrupted(
conversation_id,
event_turn_id,
&outgoing,
&turn_summary_store,
)
.await;
}
EventMsg::TurnDiff(turn_diff_event) => {
handle_turn_diff(&event_id, turn_diff_event, api_version, outgoing.as_ref()).await;
handle_turn_diff(
&event_turn_id,
turn_diff_event,
api_version,
outgoing.as_ref(),
)
.await;
}
_ => {}
@@ -560,14 +576,14 @@ pub(crate) async fn apply_bespoke_event_handling(
}
async fn handle_turn_diff(
event_id: &str,
event_turn_id: &str,
turn_diff_event: TurnDiffEvent,
api_version: ApiVersion,
outgoing: &OutgoingMessageSender,
) {
if let ApiVersion::V2 = api_version {
let notification = TurnDiffUpdatedNotification {
turn_id: event_id.to_string(),
turn_id: event_turn_id.to_string(),
diff: turn_diff_event.unified_diff,
};
outgoing
@@ -578,14 +594,14 @@ async fn handle_turn_diff(
async fn emit_turn_completed_with_status(
conversation_id: ConversationId,
event_id: String,
event_turn_id: String,
status: TurnStatus,
outgoing: &OutgoingMessageSender,
) {
let notification = TurnCompletedNotification {
thread_id: conversation_id.to_string(),
turn: Turn {
id: event_id,
id: event_turn_id,
items: vec![],
status,
},
@@ -667,7 +683,7 @@ async fn find_and_remove_turn_summary(
async fn handle_turn_complete(
conversation_id: ConversationId,
event_id: String,
event_turn_id: String,
outgoing: &OutgoingMessageSender,
turn_summary_store: &TurnSummaryStore,
) {
@@ -679,19 +695,52 @@ async fn handle_turn_complete(
TurnStatus::Completed
};
emit_turn_completed_with_status(conversation_id, event_id, status, outgoing).await;
emit_turn_completed_with_status(conversation_id, event_turn_id, status, outgoing).await;
}
async fn handle_turn_interrupted(
conversation_id: ConversationId,
event_id: String,
event_turn_id: String,
outgoing: &OutgoingMessageSender,
turn_summary_store: &TurnSummaryStore,
) {
find_and_remove_turn_summary(conversation_id, turn_summary_store).await;
emit_turn_completed_with_status(conversation_id, event_id, TurnStatus::Interrupted, outgoing)
.await;
emit_turn_completed_with_status(
conversation_id,
event_turn_id,
TurnStatus::Interrupted,
outgoing,
)
.await;
}
async fn handle_token_count_event(
conversation_id: ConversationId,
turn_id: String,
token_count_event: TokenCountEvent,
outgoing: &OutgoingMessageSender,
) {
let TokenCountEvent { info, rate_limits } = token_count_event;
if let Some(token_usage) = info.map(ThreadTokenUsage::from) {
let notification = ThreadTokenUsageUpdatedNotification {
thread_id: conversation_id.to_string(),
turn_id,
token_usage,
};
outgoing
.send_server_notification(ServerNotification::ThreadTokenUsageUpdated(notification))
.await;
}
if let Some(rate_limits) = rate_limits {
outgoing
.send_server_notification(ServerNotification::AccountRateLimitsUpdated(
AccountRateLimitsUpdatedNotification {
rate_limits: rate_limits.into(),
},
))
.await;
}
}
async fn handle_error(
@@ -704,7 +753,7 @@ async fn handle_error(
}
async fn on_patch_approval_response(
event_id: String,
event_turn_id: String,
receiver: oneshot::Receiver<JsonValue>,
codex: Arc<CodexConversation>,
) {
@@ -715,7 +764,7 @@ async fn on_patch_approval_response(
error!("request failed: {err:?}");
if let Err(submit_err) = codex
.submit(Op::PatchApproval {
id: event_id.clone(),
id: event_turn_id.clone(),
decision: ReviewDecision::Denied,
})
.await
@@ -736,7 +785,7 @@ async fn on_patch_approval_response(
if let Err(err) = codex
.submit(Op::PatchApproval {
id: event_id,
id: event_turn_id,
decision: response.decision,
})
.await
@@ -746,7 +795,7 @@ async fn on_patch_approval_response(
}
async fn on_exec_approval_response(
event_id: String,
event_turn_id: String,
receiver: oneshot::Receiver<JsonValue>,
conversation: Arc<CodexConversation>,
) {
@@ -772,7 +821,7 @@ async fn on_exec_approval_response(
if let Err(err) = conversation
.submit(Op::ExecApproval {
id: event_id,
id: event_turn_id,
decision: response.decision,
})
.await
@@ -845,7 +894,7 @@ fn format_file_change_diff(change: &CoreFileChange) -> String {
#[allow(clippy::too_many_arguments)]
async fn on_file_change_request_approval_response(
event_id: String,
event_turn_id: String,
conversation_id: ConversationId,
item_id: String,
changes: Vec<FileUpdateChange>,
@@ -890,7 +939,7 @@ async fn on_file_change_request_approval_response(
item_id,
changes,
status,
event_id.clone(),
event_turn_id.clone(),
outgoing.as_ref(),
&turn_summary_store,
)
@@ -899,7 +948,7 @@ async fn on_file_change_request_approval_response(
if let Err(err) = codex
.submit(Op::PatchApproval {
id: event_id,
id: event_turn_id,
decision,
})
.await
@@ -910,7 +959,7 @@ async fn on_file_change_request_approval_response(
#[allow(clippy::too_many_arguments)]
async fn on_command_execution_request_approval_response(
event_id: String,
event_turn_id: String,
conversation_id: ConversationId,
item_id: String,
command: String,
@@ -962,7 +1011,7 @@ async fn on_command_execution_request_approval_response(
if let Some(status) = completion_status {
complete_command_execution_item(
conversation_id,
event_id.clone(),
event_turn_id.clone(),
item_id.clone(),
command.clone(),
cwd.clone(),
@@ -975,7 +1024,7 @@ async fn on_command_execution_request_approval_response(
if let Err(err) = conversation
.submit(Op::ExecApproval {
id: event_id,
id: event_turn_id,
decision,
})
.await
@@ -1059,7 +1108,12 @@ mod tests {
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
use codex_core::protocol::CreditsSnapshot;
use codex_core::protocol::McpInvocation;
use codex_core::protocol::RateLimitSnapshot;
use codex_core::protocol::RateLimitWindow;
use codex_core::protocol::TokenUsage;
use codex_core::protocol::TokenUsageInfo;
use mcp_types::CallToolResult;
use mcp_types::ContentBlock;
use mcp_types::TextContent;
@@ -1103,14 +1157,14 @@ mod tests {
#[tokio::test]
async fn test_handle_turn_complete_emits_completed_without_error() -> Result<()> {
let conversation_id = ConversationId::new();
let event_id = "complete1".to_string();
let event_turn_id = "complete1".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let turn_summary_store = new_turn_summary_store();
handle_turn_complete(
conversation_id,
event_id.clone(),
event_turn_id.clone(),
&outgoing,
&turn_summary_store,
)
@@ -1122,7 +1176,7 @@ mod tests {
.ok_or_else(|| anyhow!("should send one notification"))?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_id);
assert_eq!(n.turn.id, event_turn_id);
assert_eq!(n.turn.status, TurnStatus::Completed);
}
other => bail!("unexpected message: {other:?}"),
@@ -1134,7 +1188,7 @@ mod tests {
#[tokio::test]
async fn test_handle_turn_interrupted_emits_interrupted_with_error() -> Result<()> {
let conversation_id = ConversationId::new();
let event_id = "interrupt1".to_string();
let event_turn_id = "interrupt1".to_string();
let turn_summary_store = new_turn_summary_store();
handle_error(
conversation_id,
@@ -1150,7 +1204,7 @@ mod tests {
handle_turn_interrupted(
conversation_id,
event_id.clone(),
event_turn_id.clone(),
&outgoing,
&turn_summary_store,
)
@@ -1162,7 +1216,7 @@ mod tests {
.ok_or_else(|| anyhow!("should send one notification"))?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_id);
assert_eq!(n.turn.id, event_turn_id);
assert_eq!(n.turn.status, TurnStatus::Interrupted);
}
other => bail!("unexpected message: {other:?}"),
@@ -1174,7 +1228,7 @@ mod tests {
#[tokio::test]
async fn test_handle_turn_complete_emits_failed_with_error() -> Result<()> {
let conversation_id = ConversationId::new();
let event_id = "complete_err1".to_string();
let event_turn_id = "complete_err1".to_string();
let turn_summary_store = new_turn_summary_store();
handle_error(
conversation_id,
@@ -1190,7 +1244,7 @@ mod tests {
handle_turn_complete(
conversation_id,
event_id.clone(),
event_turn_id.clone(),
&outgoing,
&turn_summary_store,
)
@@ -1202,7 +1256,7 @@ mod tests {
.ok_or_else(|| anyhow!("should send one notification"))?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_id);
assert_eq!(n.turn.id, event_turn_id);
assert_eq!(
n.turn.status,
TurnStatus::Failed {
@@ -1219,6 +1273,115 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_handle_token_count_event_emits_usage_and_rate_limits() -> Result<()> {
let conversation_id = ConversationId::new();
let turn_id = "turn-123".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let info = TokenUsageInfo {
total_token_usage: TokenUsage {
input_tokens: 100,
cached_input_tokens: 25,
output_tokens: 50,
reasoning_output_tokens: 9,
total_tokens: 200,
},
last_token_usage: TokenUsage {
input_tokens: 10,
cached_input_tokens: 5,
output_tokens: 7,
reasoning_output_tokens: 1,
total_tokens: 23,
},
model_context_window: Some(4096),
};
let rate_limits = RateLimitSnapshot {
primary: Some(RateLimitWindow {
used_percent: 42.5,
window_minutes: Some(15),
resets_at: Some(1700000000),
}),
secondary: None,
credits: Some(CreditsSnapshot {
has_credits: true,
unlimited: false,
balance: Some("5".to_string()),
}),
};
handle_token_count_event(
conversation_id,
turn_id.clone(),
TokenCountEvent {
info: Some(info),
rate_limits: Some(rate_limits),
},
&outgoing,
)
.await;
let first = rx
.recv()
.await
.ok_or_else(|| anyhow!("expected usage notification"))?;
match first {
OutgoingMessage::AppServerNotification(
ServerNotification::ThreadTokenUsageUpdated(payload),
) => {
assert_eq!(payload.thread_id, conversation_id.to_string());
assert_eq!(payload.turn_id, turn_id);
let usage = payload.token_usage;
assert_eq!(usage.total.total_tokens, 200);
assert_eq!(usage.total.cached_input_tokens, 25);
assert_eq!(usage.last.output_tokens, 7);
assert_eq!(usage.model_context_window, Some(4096));
}
other => bail!("unexpected notification: {other:?}"),
}
let second = rx
.recv()
.await
.ok_or_else(|| anyhow!("expected rate limit notification"))?;
match second {
OutgoingMessage::AppServerNotification(
ServerNotification::AccountRateLimitsUpdated(payload),
) => {
assert!(payload.rate_limits.primary.is_some());
assert!(payload.rate_limits.credits.is_some());
}
other => bail!("unexpected notification: {other:?}"),
}
Ok(())
}
#[tokio::test]
async fn test_handle_token_count_event_without_usage_info() -> Result<()> {
let conversation_id = ConversationId::new();
let turn_id = "turn-456".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
handle_token_count_event(
conversation_id,
turn_id.clone(),
TokenCountEvent {
info: None,
rate_limits: None,
},
&outgoing,
)
.await;
assert!(
rx.try_recv().is_err(),
"no notifications should be emitted when token usage info is absent"
);
Ok(())
}
#[tokio::test]
async fn test_construct_mcp_tool_call_begin_notification_with_args() {
let begin_event = McpToolCallBeginEvent {