Compare commits

...

2 Commits

Author SHA1 Message Date
Owen Lin
2675d37ed3 demultiplex by convo id 2025-10-21 15:04:40 -07:00
Owen Lin
9ce2a2e992 [app-server] account rate limits updated event 2025-10-21 14:53:12 -07:00
4 changed files with 141 additions and 4 deletions

View File

@@ -883,6 +883,10 @@ pub enum ServerNotification {
/// The special session configured event for a new or resumed conversation.
SessionConfigured(SessionConfiguredNotification),
#[serde(rename = "account/rateLimits/updated")]
#[ts(rename = "account/rateLimits/updated")]
#[strum(serialize = "account/rateLimits/updated")]
AccountRateLimitsUpdated(RateLimitSnapshot),
}
impl ServerNotification {
@@ -891,6 +895,7 @@ impl ServerNotification {
ServerNotification::AuthStatusChange(params) => serde_json::to_value(params),
ServerNotification::LoginChatGptComplete(params) => serde_json::to_value(params),
ServerNotification::SessionConfigured(params) => serde_json::to_value(params),
ServerNotification::AccountRateLimitsUpdated(params) => serde_json::to_value(params),
}
}
}

View File

@@ -137,6 +137,7 @@ pub(crate) struct CodexMessageProcessor {
// Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives.
pending_interrupts: Arc<Mutex<HashMap<ConversationId, Vec<RequestId>>>>,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
latest_rate_limits: Arc<Mutex<Option<RateLimitSnapshot>>>,
}
impl CodexMessageProcessor {
@@ -157,6 +158,7 @@ impl CodexMessageProcessor {
active_login: Arc::new(Mutex::new(None)),
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
latest_rate_limits: Arc::new(Mutex::new(None)),
}
}
@@ -1257,6 +1259,7 @@ impl CodexMessageProcessor {
.insert(subscription_id, cancel_tx);
let outgoing_for_task = self.outgoing.clone();
let pending_interrupts = self.pending_interrupts.clone();
let latest_rate_limits = self.latest_rate_limits.clone();
tokio::spawn(async move {
loop {
tokio::select! {
@@ -1298,6 +1301,7 @@ impl CodexMessageProcessor {
.await;
apply_bespoke_event_handling(event.clone(), conversation_id, conversation.clone(), outgoing_for_task.clone(), pending_interrupts.clone()).await;
apply_rate_limit_notification(event, outgoing_for_task.clone(), latest_rate_limits.clone()).await;
}
}
}
@@ -1466,6 +1470,37 @@ async fn apply_bespoke_event_handling(
}
}
async fn apply_rate_limit_notification(
event: Event,
outgoing: Arc<OutgoingMessageSender>,
latest_rate_limits: Arc<Mutex<Option<RateLimitSnapshot>>>,
) {
let Event { msg, .. } = event;
let EventMsg::TokenCount(token_count) = msg else {
return;
};
let Some(snapshot) = token_count.rate_limits else {
return;
};
// Only notify when the snapshot changes to avoid redundant traffic.
let should_send = {
let mut guard = latest_rate_limits.lock().await;
if guard.as_ref() == Some(&snapshot) {
false
} else {
*guard = Some(snapshot.clone());
true
}
};
if should_send {
outgoing
.send_server_notification(ServerNotification::AccountRateLimitsUpdated(snapshot))
.await;
}
}
async fn derive_config_from_params(
params: NewConversationParams,
codex_linux_sandbox_exe: Option<PathBuf>,
@@ -1633,9 +1668,17 @@ fn extract_conversation_summary(
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Context as _;
use anyhow::Result;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
use crate::outgoing_message::OutgoingMessage;
use codex_protocol::protocol::RateLimitWindow;
use codex_protocol::protocol::TokenCountEvent;
#[test]
fn extract_conversation_summary_prefers_plain_user_messages() -> Result<()> {
@@ -1681,4 +1724,89 @@ mod tests {
assert_eq!(summary.preview, "Count to 5");
Ok(())
}
#[tokio::test]
async fn apply_rate_limit_notification_emits_on_change_only() -> Result<()> {
let (tx, mut rx) = mpsc::unbounded_channel();
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let latest_rate_limits = Arc::new(Mutex::new(None));
let snapshot_one = RateLimitSnapshot {
primary: Some(RateLimitWindow {
used_percent: 10.0,
window_minutes: Some(60),
resets_at: Some(1_111),
}),
secondary: None,
};
let event_one = Event {
id: "evt-1".to_string(),
msg: EventMsg::TokenCount(TokenCountEvent {
info: None,
rate_limits: Some(snapshot_one.clone()),
}),
};
apply_rate_limit_notification(event_one, outgoing.clone(), latest_rate_limits.clone())
.await;
let message = rx.recv().await.context("receive first notification")?;
match message {
OutgoingMessage::AppServerNotification(
ServerNotification::AccountRateLimitsUpdated(received),
) => assert_eq!(received, snapshot_one),
other => panic!("unexpected message: {other:?}"),
}
let event_dup = Event {
id: "evt-2".to_string(),
msg: EventMsg::TokenCount(TokenCountEvent {
info: None,
rate_limits: Some(snapshot_one.clone()),
}),
};
apply_rate_limit_notification(event_dup, outgoing.clone(), latest_rate_limits.clone())
.await;
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
let snapshot_two = RateLimitSnapshot {
primary: Some(RateLimitWindow {
used_percent: 20.0,
window_minutes: Some(120),
resets_at: Some(2_222),
}),
secondary: None,
};
let event_two = Event {
id: "evt-3".to_string(),
msg: EventMsg::TokenCount(TokenCountEvent {
info: None,
rate_limits: Some(snapshot_two.clone()),
}),
};
apply_rate_limit_notification(
event_two.clone(),
outgoing.clone(),
latest_rate_limits.clone(),
)
.await;
let message = rx.recv().await.context("receive second notification")?;
match message {
OutgoingMessage::AppServerNotification(
ServerNotification::AccountRateLimitsUpdated(received),
) => assert_eq!(received, snapshot_two),
other => panic!("unexpected message: {other:?}"),
}
// Re-emitting the same snapshot, even if a different conversation produced it,
// should be suppressed by the global tracker.
apply_rate_limit_notification(event_two, outgoing, latest_rate_limits).await;
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
Ok(())
}
}

View File

@@ -13,9 +13,12 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
async fn get_user_agent_returns_current_codex_user_agent() {
let codex_home = TempDir::new().unwrap_or_else(|err| panic!("create tempdir: {err}"));
let mut mcp = McpProcess::new(codex_home.path())
.await
.expect("spawn mcp process");
let mut mcp = McpProcess::new_with_env(
codex_home.path(),
&[("CODEX_INTERNAL_ORIGINATOR_OVERRIDE", Some("codex_cli_rs"))],
)
.await
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("initialize timeout")

View File

@@ -27,7 +27,7 @@ At a glance:
- Approvals (server → client requests)
- `applyPatchApproval`, `execCommandApproval`
- Notifications (server → client)
- `loginChatGptComplete`, `authStatusChange`
- `loginChatGptComplete`, `authStatusChange`, `account/rateLimits/updated`
- `codex/event` stream with agent events
See code for full type definitions and exact shapes: `protocol/src/mcp_protocol.rs`.
@@ -97,6 +97,7 @@ Each response yields:
While a conversation runs, the server sends notifications:
- `codex/event` with the serialized Codex event payload. The shape matches `core/src/protocol.rs`s `Event` and `EventMsg` types. Some notifications include a `_meta.requestId` to correlate with the originating request.
- `account/rateLimits/updated` whenever Codex observes a new rate-limit snapshot for the active account. The payload matches `RateLimitSnapshot`.
- Auth notifications via method names `loginChatGptComplete` and `authStatusChange`.
Clients should render events and, when present, surface approval requests (see next section).