diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index 10a362e68f..d018d9912e 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -57,11 +57,15 @@ pub struct AnalyticsEventsClient { impl AnalyticsEventsQueue { pub(crate) fn new(auth_manager: Arc, base_url: String) -> Self { let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE); + let auth_manager = Arc::downgrade(&auth_manager); tokio::spawn(async move { let mut reducer = AnalyticsReducer::default(); while let Some(input) = receiver.recv().await { let mut events = Vec::new(); reducer.ingest(input, &mut events).await; + let Some(auth_manager) = auth_manager.upgrade() else { + break; + }; send_track_events(&auth_manager, &base_url, events).await; } }); diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index de7d810f2f..2a19564720 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::collections::HashSet; use std::fmt; use std::sync::Arc; use std::sync::atomic::AtomicI64; @@ -133,7 +132,7 @@ struct PendingCallbackEntry { callback: oneshot::Sender, thread_id: Option, request: ServerRequest, - tracked_request_connection_ids: HashSet, + track_server_response: bool, } impl ThreadScopedOutgoingMessageSender { @@ -302,7 +301,7 @@ impl OutgoingMessageSender { callback: tx_approve, thread_id, request: request.clone(), - tracked_request_connection_ids: HashSet::new(), + track_server_response: connection_ids.is_some(), }, ); } @@ -331,8 +330,8 @@ impl OutgoingMessageSender { send_error = Some(err); break; } else { - self.track_pending_server_request(*connection_id, &request) - .await; + self.analytics_events_client + .track_server_request(connection_id.0, request.clone()); } } match send_error { @@ -368,30 +367,12 @@ impl OutgoingMessageSender { { warn!("failed to resend request to client: {err:?}"); } else { - self.track_pending_server_request(connection_id, &request) - .await; + self.analytics_events_client + .track_server_request(connection_id.0, request); } } } - async fn track_pending_server_request( - &self, - connection_id: ConnectionId, - request: &ServerRequest, - ) { - let request_id = request.id().clone(); - let should_track = { - let mut request_id_to_callback = self.request_id_to_callback.lock().await; - request_id_to_callback - .get_mut(&request_id) - .is_some_and(|entry| entry.tracked_request_connection_ids.insert(connection_id)) - }; - if should_track { - self.analytics_events_client - .track_server_request(connection_id.0, request.clone()); - } - } - pub(crate) async fn notify_client_response( &self, connection_id: ConnectionId, @@ -402,9 +383,7 @@ impl OutgoingMessageSender { match entry { Some((id, entry)) => { - if entry - .tracked_request_connection_ids - .contains(&connection_id) + if entry.track_server_response && let Ok(response) = entry.request.response_from_result(&result) { self.analytics_events_client