simplify server request tracking state

This commit is contained in:
Roy Han
2026-04-23 17:51:48 -04:00
parent c092aa338c
commit 82c12ae6bf
2 changed files with 11 additions and 28 deletions

View File

@@ -57,11 +57,15 @@ pub struct AnalyticsEventsClient {
impl AnalyticsEventsQueue {
pub(crate) fn new(auth_manager: Arc<AuthManager>, base_url: String) -> Self {
let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE);
let auth_manager = Arc::downgrade(&auth_manager);
tokio::spawn(async move {
let mut reducer = AnalyticsReducer::default();
while let Some(input) = receiver.recv().await {
let mut events = Vec::new();
reducer.ingest(input, &mut events).await;
let Some(auth_manager) = auth_manager.upgrade() else {
break;
};
send_track_events(&auth_manager, &base_url, events).await;
}
});

View File

@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
@@ -133,7 +132,7 @@ struct PendingCallbackEntry {
callback: oneshot::Sender<ClientRequestResult>,
thread_id: Option<ThreadId>,
request: ServerRequest,
tracked_request_connection_ids: HashSet<ConnectionId>,
track_server_response: bool,
}
impl ThreadScopedOutgoingMessageSender {
@@ -302,7 +301,7 @@ impl OutgoingMessageSender {
callback: tx_approve,
thread_id,
request: request.clone(),
tracked_request_connection_ids: HashSet::new(),
track_server_response: connection_ids.is_some(),
},
);
}
@@ -331,8 +330,8 @@ impl OutgoingMessageSender {
send_error = Some(err);
break;
} else {
self.track_pending_server_request(*connection_id, &request)
.await;
self.analytics_events_client
.track_server_request(connection_id.0, request.clone());
}
}
match send_error {
@@ -368,30 +367,12 @@ impl OutgoingMessageSender {
{
warn!("failed to resend request to client: {err:?}");
} else {
self.track_pending_server_request(connection_id, &request)
.await;
self.analytics_events_client
.track_server_request(connection_id.0, request);
}
}
}
async fn track_pending_server_request(
&self,
connection_id: ConnectionId,
request: &ServerRequest,
) {
let request_id = request.id().clone();
let should_track = {
let mut request_id_to_callback = self.request_id_to_callback.lock().await;
request_id_to_callback
.get_mut(&request_id)
.is_some_and(|entry| entry.tracked_request_connection_ids.insert(connection_id))
};
if should_track {
self.analytics_events_client
.track_server_request(connection_id.0, request.clone());
}
}
pub(crate) async fn notify_client_response(
&self,
connection_id: ConnectionId,
@@ -402,9 +383,7 @@ impl OutgoingMessageSender {
match entry {
Some((id, entry)) => {
if entry
.tracked_request_connection_ids
.contains(&connection_id)
if entry.track_server_response
&& let Ok(response) = entry.request.response_from_result(&result)
{
self.analytics_events_client