demultiplex by convo id

This commit is contained in:
Owen Lin
2025-10-21 15:04:40 -07:00
parent 9ce2a2e992
commit 2675d37ed3

View File

@@ -1474,13 +1474,13 @@ async fn apply_rate_limit_notification(
event: Event,
outgoing: Arc<OutgoingMessageSender>,
latest_rate_limits: Arc<Mutex<Option<RateLimitSnapshot>>>,
) -> bool {
) {
let Event { msg, .. } = event;
let EventMsg::TokenCount(token_count) = msg else {
return false;
return;
};
let Some(snapshot) = token_count.rate_limits else {
return false;
return;
};
// Only notify when the snapshot changes to avoid redundant traffic.
@@ -1499,8 +1499,6 @@ async fn apply_rate_limit_notification(
.send_server_notification(ServerNotification::AccountRateLimitsUpdated(snapshot))
.await;
}
should_send
}
async fn derive_config_from_params(
@@ -1676,6 +1674,7 @@ mod tests {
use serde_json::json;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
use crate::outgoing_message::OutgoingMessage;
use codex_protocol::protocol::RateLimitWindow;
@@ -1748,10 +1747,8 @@ mod tests {
}),
};
assert!(
apply_rate_limit_notification(event_one, outgoing.clone(), latest_rate_limits.clone())
.await
);
apply_rate_limit_notification(event_one, outgoing.clone(), latest_rate_limits.clone())
.await;
let message = rx.recv().await.context("receive first notification")?;
match message {
@@ -1769,14 +1766,10 @@ mod tests {
}),
};
assert!(
!apply_rate_limit_notification(event_dup, outgoing.clone(), latest_rate_limits.clone())
.await
);
apply_rate_limit_notification(event_dup, outgoing.clone(), latest_rate_limits.clone())
.await;
if let Ok(unexpected) = rx.try_recv() {
panic!("unexpected extra message after second snapshot: {unexpected:?}");
}
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
let snapshot_two = RateLimitSnapshot {
primary: Some(RateLimitWindow {
@@ -1794,14 +1787,12 @@ mod tests {
}),
};
assert!(
apply_rate_limit_notification(
event_two.clone(),
outgoing.clone(),
latest_rate_limits.clone(),
)
.await
);
apply_rate_limit_notification(
event_two.clone(),
outgoing.clone(),
latest_rate_limits.clone(),
)
.await;
let message = rx.recv().await.context("receive second notification")?;
match message {
@@ -1811,16 +1802,10 @@ mod tests {
other => panic!("unexpected message: {other:?}"),
}
if let Ok(unexpected) = rx.try_recv() {
panic!("unexpected extra message after duplicate snapshot: {unexpected:?}");
}
// Re-emitting the same snapshot, even if a different conversation produced it,
// should be suppressed by the global tracker.
assert!(!apply_rate_limit_notification(event_two, outgoing, latest_rate_limits).await);
if let Ok(unexpected) = rx.try_recv() {
panic!("unexpected extra message after duplicate snapshot: {unexpected:?}");
}
apply_rate_limit_notification(event_two, outgoing, latest_rate_limits).await;
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
Ok(())
}