Files
codex/codex-rs/analytics/src/client.rs
rhan-oai 99016ec732 [codex-analytics] plumb protocol-native review timing (#21434)
## Why

We want terminal tool review analytics, but the reducer should not stamp
review timing from its own wall clock.

This PR plumbs review timing through the real protocol and app-server
seams so downstream analytics can consume the emitter's timestamps
directly. Guardian reviews keep their enriched `started_at` /
`completed_at` analytics fields by deriving those legacy second-based
values from the same protocol-native millisecond lifecycle timestamps,
rather than sampling a separate analytics clock.

## What changed

- add `started_at_ms` to user approval request payloads
- add `started_at_ms` / `completed_at_ms` to guardian review
notifications
- preserve Guardian review `started_at` / `completed_at` enrichment from
the protocol-native timing source
- stamp typed `ServerResponse` analytics facts with app-server-observed
`completed_at_ms`
- thread the new timing fields through core, protocol, app-server, TUI,
and analytics fixtures

## Verification

- `cargo test -p codex-app-server outgoing_message --manifest-path
codex-rs/Cargo.toml`
- `cargo test -p codex-app-server-protocol guardian --manifest-path
codex-rs/Cargo.toml`
- `cargo test -p codex-tui guardian --manifest-path codex-rs/Cargo.toml`
- `cargo test -p codex-analytics analytics_client_tests --manifest-path
codex-rs/Cargo.toml`

---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/21434).
* #18748
* __->__ #21434
* #18747
* #17090
* #17089
* #20514
2026-05-07 20:31:41 -07:00

410 lines
13 KiB
Rust

use crate::events::AppServerRpcTransport;
use crate::events::GuardianReviewAnalyticsResult;
use crate::events::GuardianReviewTrackContext;
use crate::events::TrackEventRequest;
use crate::events::TrackEventsRequest;
use crate::events::current_runtime_metadata;
use crate::facts::AnalyticsFact;
use crate::facts::AnalyticsJsonRpcError;
use crate::facts::AppInvocation;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
use crate::facts::CustomAnalyticsFact;
use crate::facts::HookRunFact;
use crate::facts::HookRunInput;
use crate::facts::PluginState;
use crate::facts::PluginStateChangedInput;
use crate::facts::SkillInvocation;
use crate::facts::SkillInvokedInput;
use crate::facts::SubAgentThreadStartedInput;
use crate::facts::TrackEventsContext;
use crate::facts::TurnResolvedConfigFact;
use crate::facts::TurnTokenUsageFact;
use crate::reducer::AnalyticsReducer;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_login::AuthManager;
use codex_login::default_client::create_client;
use codex_plugin::PluginTelemetryMetadata;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::mpsc;
const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256;
const ANALYTICS_EVENTS_TIMEOUT: Duration = Duration::from_secs(10);
const ANALYTICS_EVENT_DEDUPE_MAX_KEYS: usize = 4096;
#[derive(Clone)]
pub(crate) struct AnalyticsEventsQueue {
pub(crate) sender: mpsc::Sender<AnalyticsFact>,
pub(crate) app_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
pub(crate) plugin_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
}
#[derive(Clone)]
pub struct AnalyticsEventsClient {
queue: Option<AnalyticsEventsQueue>,
}
impl AnalyticsEventsQueue {
pub(crate) fn new(auth_manager: Arc<AuthManager>, base_url: String) -> Self {
let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE);
tokio::spawn(async move {
let mut reducer = AnalyticsReducer::default();
while let Some(input) = receiver.recv().await {
let mut events = Vec::new();
reducer.ingest(input, &mut events).await;
send_track_events(&auth_manager, &base_url, events).await;
}
});
Self {
sender,
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
}
}
fn try_send(&self, input: AnalyticsFact) {
if self.sender.try_send(input).is_err() {
//TODO: add a metric for this
tracing::warn!("dropping analytics events: queue is full");
}
}
pub(crate) fn should_enqueue_app_used(
&self,
tracking: &TrackEventsContext,
app: &AppInvocation,
) -> bool {
let Some(connector_id) = app.connector_id.as_ref() else {
return true;
};
let mut emitted = self
.app_used_emitted_keys
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS {
emitted.clear();
}
emitted.insert((tracking.turn_id.clone(), connector_id.clone()))
}
pub(crate) fn should_enqueue_plugin_used(
&self,
tracking: &TrackEventsContext,
plugin: &PluginTelemetryMetadata,
) -> bool {
let mut emitted = self
.plugin_used_emitted_keys
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS {
emitted.clear();
}
emitted.insert((tracking.turn_id.clone(), plugin.plugin_id.as_key()))
}
}
impl AnalyticsEventsClient {
pub fn new(
auth_manager: Arc<AuthManager>,
base_url: String,
analytics_enabled: Option<bool>,
) -> Self {
Self {
queue: (analytics_enabled != Some(false))
.then(|| AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url)),
}
}
pub fn disabled() -> Self {
Self { queue: None }
}
pub fn track_skill_invocations(
&self,
tracking: TrackEventsContext,
invocations: Vec<SkillInvocation>,
) {
if invocations.is_empty() {
return;
}
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked(
SkillInvokedInput {
tracking,
invocations,
},
)));
}
pub fn track_initialize(
&self,
connection_id: u64,
params: InitializeParams,
product_client_id: String,
rpc_transport: AppServerRpcTransport,
) {
self.record_fact(AnalyticsFact::Initialize {
connection_id,
params,
product_client_id,
runtime: current_runtime_metadata(),
rpc_transport,
});
}
pub fn track_subagent_thread_started(&self, input: SubAgentThreadStartedInput) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::SubAgentThreadStarted(input),
));
}
pub fn track_guardian_review(
&self,
tracking: &GuardianReviewTrackContext,
result: GuardianReviewAnalyticsResult,
) {
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::GuardianReview(
Box::new(tracking.event_params(result)),
)));
}
pub fn track_app_mentioned(&self, tracking: TrackEventsContext, mentions: Vec<AppInvocation>) {
if mentions.is_empty() {
return;
}
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppMentioned(
AppMentionedInput { tracking, mentions },
)));
}
pub fn track_request(
&self,
connection_id: u64,
request_id: RequestId,
request: &ClientRequest,
) {
if !matches!(
request,
ClientRequest::TurnStart { .. } | ClientRequest::TurnSteer { .. }
) {
return;
}
self.record_fact(AnalyticsFact::ClientRequest {
connection_id,
request_id,
request: Box::new(request.clone()),
});
}
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
let Some(queue) = self.queue.as_ref() else {
return;
};
if !queue.should_enqueue_app_used(&tracking, &app) {
return;
}
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
AppUsedInput { tracking, app },
)));
}
pub fn track_hook_run(&self, tracking: TrackEventsContext, hook: HookRunFact) {
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::HookRun(
HookRunInput { tracking, hook },
)));
}
pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
let Some(queue) = self.queue.as_ref() else {
return;
};
if !queue.should_enqueue_plugin_used(&tracking, &plugin) {
return;
}
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
crate::facts::PluginUsedInput { tracking, plugin },
)));
}
pub fn track_compaction(&self, event: crate::facts::CodexCompactionEvent) {
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(
Box::new(event),
)));
}
pub fn track_turn_resolved_config(&self, fact: TurnResolvedConfigFact) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::TurnResolvedConfig(Box::new(fact)),
));
}
pub fn track_turn_token_usage(&self, fact: TurnTokenUsageFact) {
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::TurnTokenUsage(
Box::new(fact),
)));
}
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
plugin,
state: PluginState::Installed,
}),
));
}
pub fn track_plugin_uninstalled(&self, plugin: PluginTelemetryMetadata) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
plugin,
state: PluginState::Uninstalled,
}),
));
}
pub fn track_plugin_enabled(&self, plugin: PluginTelemetryMetadata) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
plugin,
state: PluginState::Enabled,
}),
));
}
pub fn track_plugin_disabled(&self, plugin: PluginTelemetryMetadata) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
plugin,
state: PluginState::Disabled,
}),
));
}
pub(crate) fn record_fact(&self, input: AnalyticsFact) {
if let Some(queue) = self.queue.as_ref() {
queue.try_send(input);
}
}
pub fn track_response(
&self,
connection_id: u64,
request_id: RequestId,
response: ClientResponsePayload,
) {
if !matches!(
response,
ClientResponsePayload::ThreadStart(_)
| ClientResponsePayload::ThreadResume(_)
| ClientResponsePayload::ThreadFork(_)
| ClientResponsePayload::TurnStart(_)
| ClientResponsePayload::TurnSteer(_)
) {
return;
}
self.record_fact(AnalyticsFact::ClientResponse {
connection_id,
request_id,
response: Box::new(response),
});
}
pub fn track_error_response(
&self,
connection_id: u64,
request_id: RequestId,
error: JSONRPCErrorError,
error_type: Option<AnalyticsJsonRpcError>,
) {
self.record_fact(AnalyticsFact::ErrorResponse {
connection_id,
request_id,
error,
error_type,
});
}
pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) {
self.record_fact(AnalyticsFact::ServerRequest {
connection_id,
request: Box::new(request),
});
}
pub fn track_server_response(&self, completed_at_ms: u64, response: ServerResponse) {
self.record_fact(AnalyticsFact::ServerResponse {
completed_at_ms,
response: Box::new(response),
});
}
pub fn track_notification(&self, notification: ServerNotification) {
if !matches!(
notification,
ServerNotification::TurnStarted(_)
| ServerNotification::TurnCompleted(_)
| ServerNotification::ItemStarted(_)
| ServerNotification::ItemCompleted(_)
| ServerNotification::ItemGuardianApprovalReviewStarted(_)
| ServerNotification::ItemGuardianApprovalReviewCompleted(_)
) {
return;
}
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
}
}
async fn send_track_events(
auth_manager: &AuthManager,
base_url: &str,
events: Vec<TrackEventRequest>,
) {
if events.is_empty() {
return;
}
let Some(auth) = auth_manager.auth().await else {
return;
};
if !auth.uses_codex_backend() {
return;
}
let base_url = base_url.trim_end_matches('/');
let url = format!("{base_url}/codex/analytics-events/events");
let payload = TrackEventsRequest { events };
let response = create_client()
.post(&url)
.timeout(ANALYTICS_EVENTS_TIMEOUT)
.headers(codex_model_provider::auth_provider_from_auth(&auth).to_auth_headers())
.header("Content-Type", "application/json")
.json(&payload)
.send()
.await;
match response {
Ok(response) if response.status().is_success() => {}
Ok(response) => {
let status = response.status();
let body = response.text().await.unwrap_or_default();
tracing::warn!("events failed with status {status}: {body}");
}
Err(err) => {
tracing::warn!("failed to send events request: {err}");
}
}
}
#[cfg(test)]
#[path = "client_tests.rs"]
mod tests;