[app-server] account rate limits updated event

This commit is contained in:
Owen Lin
2025-10-21 11:46:28 -07:00
parent 26f314904a
commit 9ce2a2e992
4 changed files with 156 additions and 4 deletions

View File

@@ -137,6 +137,7 @@ pub(crate) struct CodexMessageProcessor {
// Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives.
pending_interrupts: Arc<Mutex<HashMap<ConversationId, Vec<RequestId>>>>,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
latest_rate_limits: Arc<Mutex<Option<RateLimitSnapshot>>>,
}
impl CodexMessageProcessor {
@@ -157,6 +158,7 @@ impl CodexMessageProcessor {
active_login: Arc::new(Mutex::new(None)),
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
latest_rate_limits: Arc::new(Mutex::new(None)),
}
}
@@ -1257,6 +1259,7 @@ impl CodexMessageProcessor {
.insert(subscription_id, cancel_tx);
let outgoing_for_task = self.outgoing.clone();
let pending_interrupts = self.pending_interrupts.clone();
let latest_rate_limits = self.latest_rate_limits.clone();
tokio::spawn(async move {
loop {
tokio::select! {
@@ -1298,6 +1301,7 @@ impl CodexMessageProcessor {
.await;
apply_bespoke_event_handling(event.clone(), conversation_id, conversation.clone(), outgoing_for_task.clone(), pending_interrupts.clone()).await;
apply_rate_limit_notification(event, outgoing_for_task.clone(), latest_rate_limits.clone()).await;
}
}
}
@@ -1466,6 +1470,39 @@ async fn apply_bespoke_event_handling(
}
}
async fn apply_rate_limit_notification(
event: Event,
outgoing: Arc<OutgoingMessageSender>,
latest_rate_limits: Arc<Mutex<Option<RateLimitSnapshot>>>,
) -> bool {
let Event { msg, .. } = event;
let EventMsg::TokenCount(token_count) = msg else {
return false;
};
let Some(snapshot) = token_count.rate_limits else {
return false;
};
// Only notify when the snapshot changes to avoid redundant traffic.
let should_send = {
let mut guard = latest_rate_limits.lock().await;
if guard.as_ref() == Some(&snapshot) {
false
} else {
*guard = Some(snapshot.clone());
true
}
};
if should_send {
outgoing
.send_server_notification(ServerNotification::AccountRateLimitsUpdated(snapshot))
.await;
}
should_send
}
async fn derive_config_from_params(
params: NewConversationParams,
codex_linux_sandbox_exe: Option<PathBuf>,
@@ -1633,9 +1670,16 @@ fn extract_conversation_summary(
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Context as _;
use anyhow::Result;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::mpsc;
use crate::outgoing_message::OutgoingMessage;
use codex_protocol::protocol::RateLimitWindow;
use codex_protocol::protocol::TokenCountEvent;
#[test]
fn extract_conversation_summary_prefers_plain_user_messages() -> Result<()> {
@@ -1681,4 +1725,103 @@ mod tests {
assert_eq!(summary.preview, "Count to 5");
Ok(())
}
#[tokio::test]
async fn apply_rate_limit_notification_emits_on_change_only() -> Result<()> {
let (tx, mut rx) = mpsc::unbounded_channel();
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let latest_rate_limits = Arc::new(Mutex::new(None));
let snapshot_one = RateLimitSnapshot {
primary: Some(RateLimitWindow {
used_percent: 10.0,
window_minutes: Some(60),
resets_at: Some(1_111),
}),
secondary: None,
};
let event_one = Event {
id: "evt-1".to_string(),
msg: EventMsg::TokenCount(TokenCountEvent {
info: None,
rate_limits: Some(snapshot_one.clone()),
}),
};
assert!(
apply_rate_limit_notification(event_one, outgoing.clone(), latest_rate_limits.clone())
.await
);
let message = rx.recv().await.context("receive first notification")?;
match message {
OutgoingMessage::AppServerNotification(
ServerNotification::AccountRateLimitsUpdated(received),
) => assert_eq!(received, snapshot_one),
other => panic!("unexpected message: {other:?}"),
}
let event_dup = Event {
id: "evt-2".to_string(),
msg: EventMsg::TokenCount(TokenCountEvent {
info: None,
rate_limits: Some(snapshot_one.clone()),
}),
};
assert!(
!apply_rate_limit_notification(event_dup, outgoing.clone(), latest_rate_limits.clone())
.await
);
if let Ok(unexpected) = rx.try_recv() {
panic!("unexpected extra message after second snapshot: {unexpected:?}");
}
let snapshot_two = RateLimitSnapshot {
primary: Some(RateLimitWindow {
used_percent: 20.0,
window_minutes: Some(120),
resets_at: Some(2_222),
}),
secondary: None,
};
let event_two = Event {
id: "evt-3".to_string(),
msg: EventMsg::TokenCount(TokenCountEvent {
info: None,
rate_limits: Some(snapshot_two.clone()),
}),
};
assert!(
apply_rate_limit_notification(
event_two.clone(),
outgoing.clone(),
latest_rate_limits.clone(),
)
.await
);
let message = rx.recv().await.context("receive second notification")?;
match message {
OutgoingMessage::AppServerNotification(
ServerNotification::AccountRateLimitsUpdated(received),
) => assert_eq!(received, snapshot_two),
other => panic!("unexpected message: {other:?}"),
}
if let Ok(unexpected) = rx.try_recv() {
panic!("unexpected extra message after duplicate snapshot: {unexpected:?}");
}
// Re-emitting the same snapshot, even if a different conversation produced it,
// should be suppressed by the global tracker.
assert!(!apply_rate_limit_notification(event_two, outgoing, latest_rate_limits).await);
if let Ok(unexpected) = rx.try_recv() {
panic!("unexpected extra message after duplicate snapshot: {unexpected:?}");
}
Ok(())
}
}