mirror of
https://github.com/openai/codex.git
synced 2026-05-15 16:53:05 +00:00
## Why Review telemetry should describe reviews as first-class events, not only as counters denormalized onto terminal tool-item events. That lets us analyze guardian and user reviews consistently across command execution, file changes, permissions, and network access, while still preserving the terminal item summaries that existing tool analytics need. To make those review events accurate, analytics also needs the observed completion time for each review and enough command metadata to distinguish `shell` from `unified_exec` reviews. ## What changed - emit generic `codex_review_event` rows for completed user and guardian reviews, with review subjects, reviewer, trigger, terminal status, resolution, and observed duration - reduce approval request / response / abort facts into review events for command execution, file change, and permissions flows - keep denormalized review counts, final approval outcome, and permission-request flags on terminal tool-item events for item-associated reviews - plumb review completion timing so user-review responses and aborts use app-server-observed completion times, while guardian analytics reuse the same terminal timestamps emitted on guardian assessment events - carry command approval `source` through the protocol and app-server layers so review analytics can distinguish `shell` from `unified_exec` - add analytics coverage for user-review emission, guardian-review emission, permission reviews that should not denormalize onto tool items, item-summary isolation across threads, and the serialized review-event shape ## Verification - `cargo test -p codex-analytics` --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/18748). * __->__ #18748 * #21434 * #18747 * #17090 * #17089 * #20514
468 lines
14 KiB
Rust
468 lines
14 KiB
Rust
use crate::events::AppServerRpcTransport;
|
|
use crate::events::GuardianReviewAnalyticsResult;
|
|
use crate::events::GuardianReviewTrackContext;
|
|
use crate::events::TrackEventRequest;
|
|
use crate::events::TrackEventsRequest;
|
|
use crate::events::current_runtime_metadata;
|
|
use crate::facts::AnalyticsFact;
|
|
use crate::facts::AnalyticsJsonRpcError;
|
|
use crate::facts::AppInvocation;
|
|
use crate::facts::AppMentionedInput;
|
|
use crate::facts::AppUsedInput;
|
|
use crate::facts::CustomAnalyticsFact;
|
|
use crate::facts::HookRunFact;
|
|
use crate::facts::HookRunInput;
|
|
use crate::facts::PluginState;
|
|
use crate::facts::PluginStateChangedInput;
|
|
use crate::facts::SkillInvocation;
|
|
use crate::facts::SkillInvokedInput;
|
|
use crate::facts::SubAgentThreadStartedInput;
|
|
use crate::facts::TrackEventsContext;
|
|
use crate::facts::TurnResolvedConfigFact;
|
|
use crate::facts::TurnTokenUsageFact;
|
|
use crate::reducer::AnalyticsReducer;
|
|
use codex_app_server_protocol::ClientRequest;
|
|
use codex_app_server_protocol::ClientResponsePayload;
|
|
use codex_app_server_protocol::InitializeParams;
|
|
use codex_app_server_protocol::JSONRPCErrorError;
|
|
use codex_app_server_protocol::RequestId;
|
|
use codex_app_server_protocol::ServerNotification;
|
|
use codex_app_server_protocol::ServerRequest;
|
|
use codex_app_server_protocol::ServerResponse;
|
|
use codex_login::AuthManager;
|
|
use codex_login::CodexAuth;
|
|
use codex_login::default_client::create_client;
|
|
use codex_plugin::PluginTelemetryMetadata;
|
|
use codex_protocol::request_permissions::RequestPermissionsResponse;
|
|
use std::collections::HashSet;
|
|
use std::sync::Arc;
|
|
use std::sync::Mutex;
|
|
use std::time::Duration;
|
|
use tokio::sync::mpsc;
|
|
|
|
const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256;
|
|
const ANALYTICS_EVENTS_TIMEOUT: Duration = Duration::from_secs(10);
|
|
const ANALYTICS_EVENT_DEDUPE_MAX_KEYS: usize = 4096;
|
|
|
|
#[derive(Clone)]
|
|
pub(crate) struct AnalyticsEventsQueue {
|
|
pub(crate) sender: mpsc::Sender<AnalyticsFact>,
|
|
pub(crate) app_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
|
|
pub(crate) plugin_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct AnalyticsEventsClient {
|
|
queue: Option<AnalyticsEventsQueue>,
|
|
}
|
|
|
|
impl AnalyticsEventsQueue {
|
|
pub(crate) fn new(auth_manager: Arc<AuthManager>, base_url: String) -> Self {
|
|
let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE);
|
|
tokio::spawn(async move {
|
|
let mut reducer = AnalyticsReducer::default();
|
|
while let Some(input) = receiver.recv().await {
|
|
let mut events = Vec::new();
|
|
reducer.ingest(input, &mut events).await;
|
|
send_track_events(&auth_manager, &base_url, events).await;
|
|
}
|
|
});
|
|
Self {
|
|
sender,
|
|
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
|
|
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
|
|
}
|
|
}
|
|
|
|
fn try_send(&self, input: AnalyticsFact) {
|
|
if self.sender.try_send(input).is_err() {
|
|
//TODO: add a metric for this
|
|
tracing::warn!("dropping analytics events: queue is full");
|
|
}
|
|
}
|
|
|
|
pub(crate) fn should_enqueue_app_used(
|
|
&self,
|
|
tracking: &TrackEventsContext,
|
|
app: &AppInvocation,
|
|
) -> bool {
|
|
let Some(connector_id) = app.connector_id.as_ref() else {
|
|
return true;
|
|
};
|
|
let mut emitted = self
|
|
.app_used_emitted_keys
|
|
.lock()
|
|
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
|
if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS {
|
|
emitted.clear();
|
|
}
|
|
emitted.insert((tracking.turn_id.clone(), connector_id.clone()))
|
|
}
|
|
|
|
pub(crate) fn should_enqueue_plugin_used(
|
|
&self,
|
|
tracking: &TrackEventsContext,
|
|
plugin: &PluginTelemetryMetadata,
|
|
) -> bool {
|
|
let mut emitted = self
|
|
.plugin_used_emitted_keys
|
|
.lock()
|
|
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
|
if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS {
|
|
emitted.clear();
|
|
}
|
|
emitted.insert((tracking.turn_id.clone(), plugin.plugin_id.as_key()))
|
|
}
|
|
}
|
|
|
|
impl AnalyticsEventsClient {
|
|
pub fn new(
|
|
auth_manager: Arc<AuthManager>,
|
|
base_url: String,
|
|
analytics_enabled: Option<bool>,
|
|
) -> Self {
|
|
Self {
|
|
queue: (analytics_enabled != Some(false))
|
|
.then(|| AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url)),
|
|
}
|
|
}
|
|
|
|
pub fn disabled() -> Self {
|
|
Self { queue: None }
|
|
}
|
|
|
|
pub fn track_skill_invocations(
|
|
&self,
|
|
tracking: TrackEventsContext,
|
|
invocations: Vec<SkillInvocation>,
|
|
) {
|
|
if invocations.is_empty() {
|
|
return;
|
|
}
|
|
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked(
|
|
SkillInvokedInput {
|
|
tracking,
|
|
invocations,
|
|
},
|
|
)));
|
|
}
|
|
|
|
pub fn track_initialize(
|
|
&self,
|
|
connection_id: u64,
|
|
params: InitializeParams,
|
|
product_client_id: String,
|
|
rpc_transport: AppServerRpcTransport,
|
|
) {
|
|
self.record_fact(AnalyticsFact::Initialize {
|
|
connection_id,
|
|
params,
|
|
product_client_id,
|
|
runtime: current_runtime_metadata(),
|
|
rpc_transport,
|
|
});
|
|
}
|
|
|
|
pub fn track_subagent_thread_started(&self, input: SubAgentThreadStartedInput) {
|
|
self.record_fact(AnalyticsFact::Custom(
|
|
CustomAnalyticsFact::SubAgentThreadStarted(input),
|
|
));
|
|
}
|
|
|
|
pub fn track_guardian_review(
|
|
&self,
|
|
tracking: &GuardianReviewTrackContext,
|
|
result: GuardianReviewAnalyticsResult,
|
|
completed_at_ms: u64,
|
|
) {
|
|
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::GuardianReview(
|
|
Box::new(tracking.event_params(result, completed_at_ms)),
|
|
)));
|
|
}
|
|
|
|
pub fn track_app_mentioned(&self, tracking: TrackEventsContext, mentions: Vec<AppInvocation>) {
|
|
if mentions.is_empty() {
|
|
return;
|
|
}
|
|
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppMentioned(
|
|
AppMentionedInput { tracking, mentions },
|
|
)));
|
|
}
|
|
|
|
pub fn track_request(
|
|
&self,
|
|
connection_id: u64,
|
|
request_id: RequestId,
|
|
request: &ClientRequest,
|
|
) {
|
|
if !matches!(
|
|
request,
|
|
ClientRequest::TurnStart { .. } | ClientRequest::TurnSteer { .. }
|
|
) {
|
|
return;
|
|
}
|
|
self.record_fact(AnalyticsFact::ClientRequest {
|
|
connection_id,
|
|
request_id,
|
|
request: Box::new(request.clone()),
|
|
});
|
|
}
|
|
|
|
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
|
|
let Some(queue) = self.queue.as_ref() else {
|
|
return;
|
|
};
|
|
if !queue.should_enqueue_app_used(&tracking, &app) {
|
|
return;
|
|
}
|
|
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
|
|
AppUsedInput { tracking, app },
|
|
)));
|
|
}
|
|
|
|
pub fn track_hook_run(&self, tracking: TrackEventsContext, hook: HookRunFact) {
|
|
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::HookRun(
|
|
HookRunInput { tracking, hook },
|
|
)));
|
|
}
|
|
|
|
pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
|
|
let Some(queue) = self.queue.as_ref() else {
|
|
return;
|
|
};
|
|
if !queue.should_enqueue_plugin_used(&tracking, &plugin) {
|
|
return;
|
|
}
|
|
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
|
|
crate::facts::PluginUsedInput { tracking, plugin },
|
|
)));
|
|
}
|
|
|
|
pub fn track_compaction(&self, event: crate::facts::CodexCompactionEvent) {
|
|
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(
|
|
Box::new(event),
|
|
)));
|
|
}
|
|
|
|
pub fn track_turn_resolved_config(&self, fact: TurnResolvedConfigFact) {
|
|
self.record_fact(AnalyticsFact::Custom(
|
|
CustomAnalyticsFact::TurnResolvedConfig(Box::new(fact)),
|
|
));
|
|
}
|
|
|
|
pub fn track_turn_token_usage(&self, fact: TurnTokenUsageFact) {
|
|
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::TurnTokenUsage(
|
|
Box::new(fact),
|
|
)));
|
|
}
|
|
|
|
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
|
|
self.record_fact(AnalyticsFact::Custom(
|
|
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
|
plugin,
|
|
state: PluginState::Installed,
|
|
}),
|
|
));
|
|
}
|
|
|
|
pub fn track_plugin_uninstalled(&self, plugin: PluginTelemetryMetadata) {
|
|
self.record_fact(AnalyticsFact::Custom(
|
|
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
|
plugin,
|
|
state: PluginState::Uninstalled,
|
|
}),
|
|
));
|
|
}
|
|
|
|
pub fn track_plugin_enabled(&self, plugin: PluginTelemetryMetadata) {
|
|
self.record_fact(AnalyticsFact::Custom(
|
|
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
|
plugin,
|
|
state: PluginState::Enabled,
|
|
}),
|
|
));
|
|
}
|
|
|
|
pub fn track_plugin_disabled(&self, plugin: PluginTelemetryMetadata) {
|
|
self.record_fact(AnalyticsFact::Custom(
|
|
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
|
plugin,
|
|
state: PluginState::Disabled,
|
|
}),
|
|
));
|
|
}
|
|
|
|
pub(crate) fn record_fact(&self, input: AnalyticsFact) {
|
|
if let Some(queue) = self.queue.as_ref() {
|
|
queue.try_send(input);
|
|
}
|
|
}
|
|
|
|
pub fn track_response(
|
|
&self,
|
|
connection_id: u64,
|
|
request_id: RequestId,
|
|
response: ClientResponsePayload,
|
|
) {
|
|
if !matches!(
|
|
response,
|
|
ClientResponsePayload::ThreadStart(_)
|
|
| ClientResponsePayload::ThreadResume(_)
|
|
| ClientResponsePayload::ThreadFork(_)
|
|
| ClientResponsePayload::TurnStart(_)
|
|
| ClientResponsePayload::TurnSteer(_)
|
|
) {
|
|
return;
|
|
}
|
|
self.record_fact(AnalyticsFact::ClientResponse {
|
|
connection_id,
|
|
request_id,
|
|
response: Box::new(response),
|
|
});
|
|
}
|
|
|
|
pub fn track_error_response(
|
|
&self,
|
|
connection_id: u64,
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
error_type: Option<AnalyticsJsonRpcError>,
|
|
) {
|
|
self.record_fact(AnalyticsFact::ErrorResponse {
|
|
connection_id,
|
|
request_id,
|
|
error,
|
|
error_type,
|
|
});
|
|
}
|
|
|
|
pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) {
|
|
self.record_fact(AnalyticsFact::ServerRequest {
|
|
connection_id,
|
|
request: Box::new(request),
|
|
});
|
|
}
|
|
|
|
pub fn track_server_response(&self, completed_at_ms: u64, response: ServerResponse) {
|
|
self.record_fact(AnalyticsFact::ServerResponse {
|
|
completed_at_ms,
|
|
response: Box::new(response),
|
|
});
|
|
}
|
|
|
|
pub fn track_effective_permissions_approval_response(
|
|
&self,
|
|
completed_at_ms: u64,
|
|
request_id: RequestId,
|
|
response: RequestPermissionsResponse,
|
|
) {
|
|
self.record_fact(AnalyticsFact::EffectivePermissionsApprovalResponse {
|
|
completed_at_ms,
|
|
request_id,
|
|
response: Box::new(response),
|
|
});
|
|
}
|
|
|
|
pub fn track_server_request_aborted(&self, completed_at_ms: u64, request_id: RequestId) {
|
|
self.record_fact(AnalyticsFact::ServerRequestAborted {
|
|
completed_at_ms,
|
|
request_id,
|
|
});
|
|
}
|
|
|
|
pub fn track_notification(&self, notification: ServerNotification) {
|
|
if !matches!(
|
|
notification,
|
|
ServerNotification::TurnStarted(_)
|
|
| ServerNotification::TurnCompleted(_)
|
|
| ServerNotification::TurnDiffUpdated(_)
|
|
| ServerNotification::ItemStarted(_)
|
|
| ServerNotification::ItemCompleted(_)
|
|
| ServerNotification::ItemGuardianApprovalReviewStarted(_)
|
|
| ServerNotification::ItemGuardianApprovalReviewCompleted(_)
|
|
) {
|
|
return;
|
|
}
|
|
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
|
|
}
|
|
}
|
|
|
|
async fn send_track_events(
|
|
auth_manager: &AuthManager,
|
|
base_url: &str,
|
|
events: Vec<TrackEventRequest>,
|
|
) {
|
|
if events.is_empty() {
|
|
return;
|
|
}
|
|
|
|
let Some(auth) = auth_manager.auth().await else {
|
|
return;
|
|
};
|
|
if !auth.uses_codex_backend() {
|
|
return;
|
|
}
|
|
|
|
let base_url = base_url.trim_end_matches('/');
|
|
let url = format!("{base_url}/codex/analytics-events/events");
|
|
for events in track_event_request_batches(events) {
|
|
send_track_events_request(&auth, &url, events).await;
|
|
}
|
|
}
|
|
|
|
fn track_event_request_batches(events: Vec<TrackEventRequest>) -> Vec<Vec<TrackEventRequest>> {
|
|
let mut batches = Vec::new();
|
|
let mut current_batch = Vec::new();
|
|
|
|
for event in events {
|
|
if event.should_send_in_isolated_request() {
|
|
if !current_batch.is_empty() {
|
|
batches.push(current_batch);
|
|
current_batch = Vec::new();
|
|
}
|
|
batches.push(vec![event]);
|
|
} else {
|
|
current_batch.push(event);
|
|
}
|
|
}
|
|
|
|
if !current_batch.is_empty() {
|
|
batches.push(current_batch);
|
|
}
|
|
|
|
batches
|
|
}
|
|
|
|
async fn send_track_events_request(auth: &CodexAuth, url: &str, events: Vec<TrackEventRequest>) {
|
|
if events.is_empty() {
|
|
return;
|
|
}
|
|
|
|
let payload = TrackEventsRequest { events };
|
|
|
|
let response = create_client()
|
|
.post(url)
|
|
.timeout(ANALYTICS_EVENTS_TIMEOUT)
|
|
.headers(codex_model_provider::auth_provider_from_auth(auth).to_auth_headers())
|
|
.header("Content-Type", "application/json")
|
|
.json(&payload)
|
|
.send()
|
|
.await;
|
|
|
|
match response {
|
|
Ok(response) if response.status().is_success() => {}
|
|
Ok(response) => {
|
|
let status = response.status();
|
|
let body = response.text().await.unwrap_or_default();
|
|
tracing::warn!("events failed with status {status}: {body}");
|
|
}
|
|
Err(err) => {
|
|
tracing::warn!("failed to send events request: {err}");
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
#[path = "client_tests.rs"]
|
|
mod tests;
|