use codex_git_utils::collect_git_info; use codex_git_utils::get_git_repo_root; use codex_login::AuthManager; use codex_login::default_client::create_client; use codex_login::default_client::originator; use codex_plugin::PluginTelemetryMetadata; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SkillScope; use codex_protocol::protocol::SubAgentSource; use serde::Serialize; use sha1::Digest; use sha1::Sha1; use std::collections::HashMap; use std::collections::HashSet; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; use tokio::sync::mpsc; #[derive(Clone)] pub struct TrackEventsContext { pub model_slug: String, pub thread_id: String, pub turn_id: String, } #[derive(Clone)] pub struct CodexThreadInitializedEvent { pub thread_id: String, pub model: String, pub ephemeral: bool, pub session_source: SessionSource, pub initialization_mode: InitializationMode, pub subagent_source: Option, pub parent_thread_id: Option, pub created_at: u64, } #[derive(Clone)] pub struct CodexThreadInitializedInput { pub thread_id: String, pub model: String, pub product_client_id: String, pub created_at: u64, pub thread_context: CodexThreadContext, } #[derive(Clone)] pub struct CodexThreadContext { pub ephemeral: bool, pub session_source: SessionSource, pub initialization_mode: InitializationMode, pub subagent_source: Option, pub parent_thread_id: Option, } #[derive(Clone, Copy, Debug, Serialize)] #[serde(rename_all = "snake_case")] pub enum InitializationMode { New, Forked, Resumed, } pub fn build_track_events_context( model_slug: String, thread_id: String, turn_id: String, ) -> TrackEventsContext { TrackEventsContext { model_slug, thread_id, turn_id, } } #[derive(Clone, Debug)] pub struct SkillInvocation { pub skill_name: String, pub skill_scope: SkillScope, pub skill_path: PathBuf, pub invocation_type: InvocationType, } #[derive(Clone, Copy, Debug, Serialize)] #[serde(rename_all = "lowercase")] pub enum InvocationType { Explicit, Implicit, } pub struct AppInvocation { pub connector_id: Option, pub app_name: Option, pub invocation_type: Option, } pub enum AnalyticsInput { ThreadInitialized(CodexThreadInitializedInput), SkillInvoked(SkillInvokedInput), AppMentioned(AppMentionedInput), AppUsed(AppUsedInput), PluginUsed(PluginUsedInput), PluginStateChanged(PluginStateChangedInput), } pub struct SkillInvokedInput { pub tracking: TrackEventsContext, pub invocations: Vec, } pub struct AppMentionedInput { pub tracking: TrackEventsContext, pub mentions: Vec, } pub struct AppUsedInput { pub tracking: TrackEventsContext, pub app: AppInvocation, } pub struct PluginUsedInput { pub tracking: TrackEventsContext, pub plugin: PluginTelemetryMetadata, } pub struct PluginStateChangedInput { pub plugin: PluginTelemetryMetadata, pub state: PluginState, } #[derive(Clone, Copy)] pub enum PluginState { Installed, Uninstalled, Enabled, Disabled, } #[derive(Default)] pub struct AnalyticsReducer { threads: HashMap, } struct ThreadState { _initialized_event: CodexThreadInitializedEvent, } #[derive(Clone)] pub(crate) struct AnalyticsEventsQueue { sender: mpsc::Sender, app_used_emitted_keys: Arc>>, plugin_used_emitted_keys: Arc>>, } #[derive(Clone)] pub struct AnalyticsEventsClient { queue: AnalyticsEventsQueue, analytics_enabled: Option, } impl AnalyticsEventsQueue { pub(crate) fn new(auth_manager: Arc, base_url: String) -> Self { let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE); tokio::spawn(async move { let mut reducer = AnalyticsReducer::default(); while let Some(job) = receiver.recv().await { let mut events = Vec::new(); reducer.ingest(job, &mut events).await; send_track_events(&auth_manager, &base_url, events).await; } }); Self { sender, app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())), plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())), } } fn try_send(&self, input: AnalyticsInput) { if self.sender.try_send(input).is_err() { //TODO: add a metric for this tracing::warn!("dropping analytics events: queue is full"); } } fn should_enqueue_app_used(&self, tracking: &TrackEventsContext, app: &AppInvocation) -> bool { let Some(connector_id) = app.connector_id.as_ref() else { return true; }; let mut emitted = self .app_used_emitted_keys .lock() .unwrap_or_else(std::sync::PoisonError::into_inner); if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS { emitted.clear(); } emitted.insert((tracking.turn_id.clone(), connector_id.clone())) } fn should_enqueue_plugin_used( &self, tracking: &TrackEventsContext, plugin: &PluginTelemetryMetadata, ) -> bool { let mut emitted = self .plugin_used_emitted_keys .lock() .unwrap_or_else(std::sync::PoisonError::into_inner); if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS { emitted.clear(); } emitted.insert((tracking.turn_id.clone(), plugin.plugin_id.as_key())) } } impl AnalyticsEventsClient { pub fn new( auth_manager: Arc, base_url: String, analytics_enabled: Option, ) -> Self { Self { queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url), analytics_enabled, } } pub fn track_skill_invocations( &self, tracking: TrackEventsContext, invocations: Vec, ) { if invocations.is_empty() { return; } self.record(AnalyticsInput::SkillInvoked(SkillInvokedInput { tracking, invocations, })); } pub fn track_thread_initialized(&self, input: CodexThreadInitializedInput) { self.record(AnalyticsInput::ThreadInitialized(input)); } pub fn track_app_mentioned(&self, tracking: TrackEventsContext, mentions: Vec) { if mentions.is_empty() { return; } self.record(AnalyticsInput::AppMentioned(AppMentionedInput { tracking, mentions, })); } pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) { if !self.queue.should_enqueue_app_used(&tracking, &app) { return; } self.record(AnalyticsInput::AppUsed(AppUsedInput { tracking, app })); } pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) { if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) { return; } self.record(AnalyticsInput::PluginUsed(PluginUsedInput { tracking, plugin, })); } pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) { self.record(AnalyticsInput::PluginStateChanged( PluginStateChangedInput { plugin, state: PluginState::Installed, }, )); } pub fn track_plugin_uninstalled(&self, plugin: PluginTelemetryMetadata) { self.record(AnalyticsInput::PluginStateChanged( PluginStateChangedInput { plugin, state: PluginState::Uninstalled, }, )); } pub fn track_plugin_enabled(&self, plugin: PluginTelemetryMetadata) { self.record(AnalyticsInput::PluginStateChanged( PluginStateChangedInput { plugin, state: PluginState::Enabled, }, )); } pub fn track_plugin_disabled(&self, plugin: PluginTelemetryMetadata) { self.record(AnalyticsInput::PluginStateChanged( PluginStateChangedInput { plugin, state: PluginState::Disabled, }, )); } pub fn record(&self, input: AnalyticsInput) { if self.analytics_enabled == Some(false) { return; } self.queue.try_send(input); } } const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256; const ANALYTICS_EVENTS_TIMEOUT: Duration = Duration::from_secs(10); const ANALYTICS_EVENT_DEDUPE_MAX_KEYS: usize = 4096; #[derive(Serialize)] struct TrackEventsRequest { events: Vec, } #[derive(Serialize)] #[serde(untagged)] enum TrackEventRequest { SkillInvocation(SkillInvocationEventRequest), ThreadInitialized(CodexThreadInitializedEventRequest), AppMentioned(CodexAppMentionedEventRequest), AppUsed(CodexAppUsedEventRequest), PluginUsed(CodexPluginUsedEventRequest), PluginInstalled(CodexPluginEventRequest), PluginUninstalled(CodexPluginEventRequest), PluginEnabled(CodexPluginEventRequest), PluginDisabled(CodexPluginEventRequest), } #[derive(Serialize)] struct SkillInvocationEventRequest { event_type: &'static str, skill_id: String, skill_name: String, event_params: SkillInvocationEventParams, } #[derive(Serialize)] struct SkillInvocationEventParams { product_client_id: Option, skill_scope: Option, repo_url: Option, thread_id: Option, invoke_type: Option, model_slug: Option, } #[derive(Serialize)] struct CodexThreadInitializedEventParams { thread_id: String, product_client_id: String, model: String, ephemeral: bool, session_source: Option<&'static str>, initialization_mode: InitializationMode, subagent_source: Option, parent_thread_id: Option, created_at: u64, } #[derive(Serialize)] struct CodexThreadInitializedEventRequest { event_type: &'static str, event_params: CodexThreadInitializedEventParams, } #[derive(Serialize)] struct CodexAppMetadata { connector_id: Option, thread_id: Option, turn_id: Option, app_name: Option, product_client_id: Option, invoke_type: Option, model_slug: Option, } #[derive(Serialize)] struct CodexAppMentionedEventRequest { event_type: &'static str, event_params: CodexAppMetadata, } #[derive(Serialize)] struct CodexAppUsedEventRequest { event_type: &'static str, event_params: CodexAppMetadata, } #[derive(Serialize)] struct CodexPluginMetadata { plugin_id: Option, plugin_name: Option, marketplace_name: Option, has_skills: Option, mcp_server_count: Option, connector_ids: Option>, product_client_id: Option, } #[derive(Serialize)] struct CodexPluginUsedMetadata { #[serde(flatten)] plugin: CodexPluginMetadata, thread_id: Option, turn_id: Option, model_slug: Option, } #[derive(Serialize)] struct CodexPluginEventRequest { event_type: &'static str, event_params: CodexPluginMetadata, } #[derive(Serialize)] struct CodexPluginUsedEventRequest { event_type: &'static str, event_params: CodexPluginUsedMetadata, } impl AnalyticsReducer { async fn ingest(&mut self, input: AnalyticsInput, out: &mut Vec) { match input { AnalyticsInput::ThreadInitialized(input) => { self.ingest_thread_initialized(input, out); } AnalyticsInput::SkillInvoked(input) => { self.ingest_skill_invoked(input, out).await; } AnalyticsInput::AppMentioned(input) => { self.ingest_app_mentioned(input, out); } AnalyticsInput::AppUsed(input) => { self.ingest_app_used(input, out); } AnalyticsInput::PluginUsed(input) => { self.ingest_plugin_used(input, out); } AnalyticsInput::PluginStateChanged(input) => { self.ingest_plugin_state_changed(input, out); } } } fn ingest_thread_initialized( &mut self, input: CodexThreadInitializedInput, out: &mut Vec, ) { let product_client_id = input.product_client_id.clone(); let event = CodexThreadInitializedEvent { thread_id: input.thread_id, model: input.model, ephemeral: input.thread_context.ephemeral, session_source: input.thread_context.session_source, initialization_mode: input.thread_context.initialization_mode, subagent_source: input.thread_context.subagent_source, parent_thread_id: input.thread_context.parent_thread_id, created_at: input.created_at, }; self.threads.insert( event.thread_id.clone(), ThreadState { _initialized_event: event.clone(), }, ); out.push(TrackEventRequest::ThreadInitialized( codex_thread_initialized_event_request(product_client_id, event), )); } async fn ingest_skill_invoked( &mut self, input: SkillInvokedInput, out: &mut Vec, ) { let SkillInvokedInput { tracking, invocations, } = input; for invocation in invocations { let skill_scope = match invocation.skill_scope { SkillScope::User => "user", SkillScope::Repo => "repo", SkillScope::System => "system", SkillScope::Admin => "admin", }; let repo_root = get_git_repo_root(invocation.skill_path.as_path()); let repo_url = if let Some(root) = repo_root.as_ref() { collect_git_info(root) .await .and_then(|info| info.repository_url) } else { None }; let skill_id = skill_id_for_local_skill( repo_url.as_deref(), repo_root.as_deref(), invocation.skill_path.as_path(), invocation.skill_name.as_str(), ); out.push(TrackEventRequest::SkillInvocation( SkillInvocationEventRequest { event_type: "skill_invocation", skill_id, skill_name: invocation.skill_name.clone(), event_params: SkillInvocationEventParams { thread_id: Some(tracking.thread_id.clone()), invoke_type: Some(invocation.invocation_type), model_slug: Some(tracking.model_slug.clone()), product_client_id: Some(originator().value), repo_url, skill_scope: Some(skill_scope.to_string()), }, }, )); } } fn ingest_app_mentioned(&mut self, input: AppMentionedInput, out: &mut Vec) { let AppMentionedInput { tracking, mentions } = input; out.extend(mentions.into_iter().map(|mention| { let event_params = codex_app_metadata(&tracking, mention); TrackEventRequest::AppMentioned(CodexAppMentionedEventRequest { event_type: "codex_app_mentioned", event_params, }) })); } fn ingest_app_used(&mut self, input: AppUsedInput, out: &mut Vec) { let AppUsedInput { tracking, app } = input; let event_params = codex_app_metadata(&tracking, app); out.push(TrackEventRequest::AppUsed(CodexAppUsedEventRequest { event_type: "codex_app_used", event_params, })); } fn ingest_plugin_used(&mut self, input: PluginUsedInput, out: &mut Vec) { let PluginUsedInput { tracking, plugin } = input; out.push(TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest { event_type: "codex_plugin_used", event_params: codex_plugin_used_metadata(&tracking, plugin), })); } fn ingest_plugin_state_changed( &mut self, input: PluginStateChangedInput, out: &mut Vec, ) { let PluginStateChangedInput { plugin, state } = input; let event = CodexPluginEventRequest { event_type: plugin_state_event_type(state), event_params: codex_plugin_metadata(plugin), }; out.push(match state { PluginState::Installed => TrackEventRequest::PluginInstalled(event), PluginState::Uninstalled => TrackEventRequest::PluginUninstalled(event), PluginState::Enabled => TrackEventRequest::PluginEnabled(event), PluginState::Disabled => TrackEventRequest::PluginDisabled(event), }); } } fn plugin_state_event_type(state: PluginState) -> &'static str { match state { PluginState::Installed => "codex_plugin_installed", PluginState::Uninstalled => "codex_plugin_uninstalled", PluginState::Enabled => "codex_plugin_enabled", PluginState::Disabled => "codex_plugin_disabled", } } fn codex_app_metadata(tracking: &TrackEventsContext, app: AppInvocation) -> CodexAppMetadata { CodexAppMetadata { connector_id: app.connector_id, thread_id: Some(tracking.thread_id.clone()), turn_id: Some(tracking.turn_id.clone()), app_name: app.app_name, product_client_id: Some(originator().value), invoke_type: app.invocation_type, model_slug: Some(tracking.model_slug.clone()), } } #[cfg(test)] fn codex_thread_initialized_event_params( thread_event: CodexThreadInitializedEvent, ) -> CodexThreadInitializedEventParams { codex_thread_initialized_event_params_with_product_client_id(originator().value, thread_event) } fn codex_thread_initialized_event_request( product_client_id: String, thread_event: CodexThreadInitializedEvent, ) -> CodexThreadInitializedEventRequest { CodexThreadInitializedEventRequest { event_type: "codex_thread_initialized", event_params: codex_thread_initialized_event_params_with_product_client_id( product_client_id, thread_event, ), } } fn codex_thread_initialized_event_params_with_product_client_id( product_client_id: String, thread_event: CodexThreadInitializedEvent, ) -> CodexThreadInitializedEventParams { CodexThreadInitializedEventParams { thread_id: thread_event.thread_id, product_client_id, model: thread_event.model, ephemeral: thread_event.ephemeral, session_source: session_source_name(&thread_event.session_source), initialization_mode: thread_event.initialization_mode, subagent_source: thread_event.subagent_source.map(subagent_source_name), parent_thread_id: thread_event.parent_thread_id, created_at: thread_event.created_at, } } fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPluginMetadata { let capability_summary = plugin.capability_summary; CodexPluginMetadata { plugin_id: Some(plugin.plugin_id.as_key()), plugin_name: Some(plugin.plugin_id.plugin_name), marketplace_name: Some(plugin.plugin_id.marketplace_name), has_skills: capability_summary .as_ref() .map(|summary| summary.has_skills), mcp_server_count: capability_summary .as_ref() .map(|summary| summary.mcp_server_names.len()), connector_ids: capability_summary.map(|summary| { summary .app_connector_ids .into_iter() .map(|connector_id| connector_id.0) .collect() }), product_client_id: Some(originator().value), } } fn codex_plugin_used_metadata( tracking: &TrackEventsContext, plugin: PluginTelemetryMetadata, ) -> CodexPluginUsedMetadata { CodexPluginUsedMetadata { plugin: codex_plugin_metadata(plugin), thread_id: Some(tracking.thread_id.clone()), turn_id: Some(tracking.turn_id.clone()), model_slug: Some(tracking.model_slug.clone()), } } fn session_source_name(session_source: &SessionSource) -> Option<&'static str> { match session_source { SessionSource::Cli | SessionSource::VSCode | SessionSource::Exec => Some("user"), SessionSource::SubAgent(_) => Some("subagent"), SessionSource::Mcp | SessionSource::Custom(_) | SessionSource::Unknown => None, } } fn subagent_source_name(subagent_source: SubAgentSource) -> String { match subagent_source { SubAgentSource::Review => "review".to_string(), SubAgentSource::Compact => "compact".to_string(), SubAgentSource::ThreadSpawn { .. } => "thread_spawn".to_string(), SubAgentSource::MemoryConsolidation => "memory_consolidation".to_string(), SubAgentSource::Other(other) => other, } } async fn send_track_events( auth_manager: &AuthManager, base_url: &str, events: Vec, ) { if events.is_empty() { return; } let Some(auth) = auth_manager.auth().await else { return; }; if !auth.is_chatgpt_auth() { return; } let access_token = match auth.get_token() { Ok(token) => token, Err(_) => return, }; let Some(account_id) = auth.get_account_id() else { return; }; let base_url = base_url.trim_end_matches('/'); let url = format!("{base_url}/codex/analytics-events/events"); let payload = TrackEventsRequest { events }; let response = create_client() .post(&url) .timeout(ANALYTICS_EVENTS_TIMEOUT) .bearer_auth(&access_token) .header("chatgpt-account-id", &account_id) .header("Content-Type", "application/json") .json(&payload) .send() .await; match response { Ok(response) if response.status().is_success() => {} Ok(response) => { let status = response.status(); let body = response.text().await.unwrap_or_default(); tracing::warn!("events failed with status {status}: {body}"); } Err(err) => { tracing::warn!("failed to send events request: {err}"); } } } pub(crate) fn skill_id_for_local_skill( repo_url: Option<&str>, repo_root: Option<&Path>, skill_path: &Path, skill_name: &str, ) -> String { let path = normalize_path_for_skill_id(repo_url, repo_root, skill_path); let prefix = if let Some(url) = repo_url { format!("repo_{url}") } else { "personal".to_string() }; let raw_id = format!("{prefix}_{path}_{skill_name}"); let mut hasher = Sha1::new(); hasher.update(raw_id.as_bytes()); format!("{:x}", hasher.finalize()) } /// Returns a normalized path for skill ID construction. /// /// - Repo-scoped skills use a path relative to the repo root. /// - User/admin/system skills use an absolute path. fn normalize_path_for_skill_id( repo_url: Option<&str>, repo_root: Option<&Path>, skill_path: &Path, ) -> String { let resolved_path = std::fs::canonicalize(skill_path).unwrap_or_else(|_| skill_path.to_path_buf()); match (repo_url, repo_root) { (Some(_), Some(root)) => { let resolved_root = std::fs::canonicalize(root).unwrap_or_else(|_| root.to_path_buf()); resolved_path .strip_prefix(&resolved_root) .unwrap_or(resolved_path.as_path()) .to_string_lossy() .replace('\\', "/") } _ => resolved_path.to_string_lossy().replace('\\', "/"), } } #[cfg(test)] #[path = "analytics_client_tests.rs"] mod tests;