From b164ac6d1e9df800ea241e1a01d06474a43a815c Mon Sep 17 00:00:00 2001 From: alexsong-oai Date: Sat, 31 Jan 2026 18:06:26 -0800 Subject: [PATCH] feat: fire tracking events for skill invocation (#10120) --- codex-rs/core/src/analytics_client.rs | 331 ++++++++++++++++++++++++++ codex-rs/core/src/codex.rs | 24 +- codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/skills/injection.rs | 13 + codex-rs/core/src/state/service.rs | 2 + 5 files changed, 370 insertions(+), 1 deletion(-) create mode 100644 codex-rs/core/src/analytics_client.rs diff --git a/codex-rs/core/src/analytics_client.rs b/codex-rs/core/src/analytics_client.rs new file mode 100644 index 0000000000..d625166b09 --- /dev/null +++ b/codex-rs/core/src/analytics_client.rs @@ -0,0 +1,331 @@ +use crate::AuthManager; +use crate::config::Config; +use crate::default_client::create_client; +use crate::git_info::collect_git_info; +use crate::git_info::get_git_repo_root; +use codex_protocol::protocol::SkillScope; +use serde::Serialize; +use sha1::Digest; +use sha1::Sha1; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc; + +#[derive(Clone)] +pub(crate) struct TrackEventsContext { + pub(crate) model_slug: String, + pub(crate) thread_id: String, +} + +pub(crate) fn build_track_events_context( + model_slug: String, + thread_id: String, +) -> TrackEventsContext { + TrackEventsContext { + model_slug, + thread_id, + } +} + +pub(crate) struct SkillInvocation { + pub(crate) skill_name: String, + pub(crate) skill_scope: SkillScope, + pub(crate) skill_path: PathBuf, +} + +#[derive(Clone)] +pub(crate) struct AnalyticsEventsQueue { + sender: mpsc::Sender, +} + +pub(crate) struct AnalyticsEventsClient { + queue: AnalyticsEventsQueue, + config: Arc, +} + +impl AnalyticsEventsQueue { + pub(crate) fn new(auth_manager: Arc) -> Self { + let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE); + tokio::spawn(async move { + while let Some(job) = receiver.recv().await { + send_track_skill_invocations(&auth_manager, job).await; + } + }); + Self { sender } + } + + fn try_send(&self, job: TrackEventsJob) { + if self.sender.try_send(job).is_err() { + //TODO: add a metric for this + tracing::warn!("dropping skill analytics events: queue is full"); + } + } +} + +impl AnalyticsEventsClient { + pub(crate) fn new(config: Arc, auth_manager: Arc) -> Self { + Self { + queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager)), + config, + } + } + + pub(crate) fn track_skill_invocations( + &self, + tracking: TrackEventsContext, + invocations: Vec, + ) { + track_skill_invocations( + &self.queue, + Arc::clone(&self.config), + Some(tracking), + invocations, + ); + } +} + +struct TrackEventsJob { + config: Arc, + tracking: TrackEventsContext, + invocations: Vec, +} + +const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256; +const ANALYTICS_EVENTS_TIMEOUT: Duration = Duration::from_secs(10); + +#[derive(Serialize)] +struct TrackEventsRequest { + events: Vec, +} + +#[derive(Serialize)] +struct TrackEvent { + event_type: &'static str, + skill_id: String, + skill_name: String, + event_params: TrackEventParams, +} + +#[derive(Serialize)] +struct TrackEventParams { + product_client_id: Option, + skill_scope: Option, + repo_url: Option, + thread_id: Option, + invoke_type: Option, + model_slug: Option, +} + +pub(crate) fn track_skill_invocations( + queue: &AnalyticsEventsQueue, + config: Arc, + tracking: Option, + invocations: Vec, +) { + if config.analytics_enabled == Some(false) { + return; + } + let Some(tracking) = tracking else { + return; + }; + if invocations.is_empty() { + return; + } + let job = TrackEventsJob { + config, + tracking, + invocations, + }; + queue.try_send(job); +} + +async fn send_track_skill_invocations(auth_manager: &AuthManager, job: TrackEventsJob) { + let TrackEventsJob { + config, + tracking, + invocations, + } = job; + let Some(auth) = auth_manager.auth().await else { + return; + }; + if !auth.is_chatgpt_auth() { + return; + } + let access_token = match auth.get_token() { + Ok(token) => token, + Err(_) => return, + }; + let Some(account_id) = auth.get_account_id() else { + return; + }; + + let mut events = Vec::with_capacity(invocations.len()); + for invocation in invocations { + let skill_scope = match invocation.skill_scope { + SkillScope::User => "user", + SkillScope::Repo => "repo", + SkillScope::System => "system", + SkillScope::Admin => "admin", + }; + let repo_root = get_git_repo_root(invocation.skill_path.as_path()); + let repo_url = if let Some(root) = repo_root.as_ref() { + collect_git_info(root) + .await + .and_then(|info| info.repository_url) + } else { + None + }; + let skill_id = skill_id_for_local_skill( + repo_url.as_deref(), + repo_root.as_deref(), + invocation.skill_path.as_path(), + invocation.skill_name.as_str(), + ); + events.push(TrackEvent { + event_type: "skill_invocation", + skill_id, + skill_name: invocation.skill_name.clone(), + event_params: TrackEventParams { + thread_id: Some(tracking.thread_id.clone()), + invoke_type: Some("explicit".to_string()), + model_slug: Some(tracking.model_slug.clone()), + product_client_id: Some(crate::default_client::originator().value), + repo_url, + skill_scope: Some(skill_scope.to_string()), + }, + }); + } + + let base_url = config.chatgpt_base_url.trim_end_matches('/'); + let url = format!("{base_url}/codex/analytics-events/events"); + let payload = TrackEventsRequest { events }; + + let response = create_client() + .post(&url) + .timeout(ANALYTICS_EVENTS_TIMEOUT) + .bearer_auth(&access_token) + .header("chatgpt-account-id", &account_id) + .header("Content-Type", "application/json") + .json(&payload) + .send() + .await; + + match response { + Ok(response) if response.status().is_success() => {} + Ok(response) => { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + tracing::warn!("events failed with status {status}: {body}"); + } + Err(err) => { + tracing::warn!("failed to send events request: {err}"); + } + } +} + +fn skill_id_for_local_skill( + repo_url: Option<&str>, + repo_root: Option<&Path>, + skill_path: &Path, + skill_name: &str, +) -> String { + let path = normalize_path_for_skill_id(repo_url, repo_root, skill_path); + let prefix = if let Some(url) = repo_url { + format!("repo_{url}") + } else { + "personal".to_string() + }; + let raw_id = format!("{prefix}_{path}_{skill_name}"); + let mut hasher = Sha1::new(); + hasher.update(raw_id.as_bytes()); + format!("{:x}", hasher.finalize()) +} + +/// Returns a normalized path for skill ID construction. +/// +/// - Repo-scoped skills use a path relative to the repo root. +/// - User/admin/system skills use an absolute path. +fn normalize_path_for_skill_id( + repo_url: Option<&str>, + repo_root: Option<&Path>, + skill_path: &Path, +) -> String { + let resolved_path = + std::fs::canonicalize(skill_path).unwrap_or_else(|_| skill_path.to_path_buf()); + match (repo_url, repo_root) { + (Some(_), Some(root)) => { + let resolved_root = std::fs::canonicalize(root).unwrap_or_else(|_| root.to_path_buf()); + resolved_path + .strip_prefix(&resolved_root) + .unwrap_or(resolved_path.as_path()) + .to_string_lossy() + .replace('\\', "/") + } + _ => resolved_path.to_string_lossy().replace('\\', "/"), + } +} + +#[cfg(test)] +mod tests { + use super::normalize_path_for_skill_id; + use pretty_assertions::assert_eq; + use std::path::PathBuf; + + fn expected_absolute_path(path: &PathBuf) -> String { + std::fs::canonicalize(path) + .unwrap_or_else(|_| path.to_path_buf()) + .to_string_lossy() + .replace('\\', "/") + } + + #[test] + fn normalize_path_for_skill_id_repo_scoped_uses_relative_path() { + let repo_root = PathBuf::from("/repo/root"); + let skill_path = PathBuf::from("/repo/root/.codex/skills/doc/SKILL.md"); + + let path = normalize_path_for_skill_id( + Some("https://example.com/repo.git"), + Some(repo_root.as_path()), + skill_path.as_path(), + ); + + assert_eq!(path, ".codex/skills/doc/SKILL.md"); + } + + #[test] + fn normalize_path_for_skill_id_user_scoped_uses_absolute_path() { + let skill_path = PathBuf::from("/Users/abc/.codex/skills/doc/SKILL.md"); + + let path = normalize_path_for_skill_id(None, None, skill_path.as_path()); + let expected = expected_absolute_path(&skill_path); + + assert_eq!(path, expected); + } + + #[test] + fn normalize_path_for_skill_id_admin_scoped_uses_absolute_path() { + let skill_path = PathBuf::from("/etc/codex/skills/doc/SKILL.md"); + + let path = normalize_path_for_skill_id(None, None, skill_path.as_path()); + let expected = expected_absolute_path(&skill_path); + + assert_eq!(path, expected); + } + + #[test] + fn normalize_path_for_skill_id_repo_root_not_in_skill_path_uses_absolute_path() { + let repo_root = PathBuf::from("/repo/root"); + let skill_path = PathBuf::from("/other/path/.codex/skills/doc/SKILL.md"); + + let path = normalize_path_for_skill_id( + Some("https://example.com/repo.git"), + Some(repo_root.as_path()), + skill_path.as_path(), + ); + let expected = expected_absolute_path(&skill_path); + + assert_eq!(path, expected); + } +} diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 64d79d392f..f987357874 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -14,6 +14,8 @@ use crate::agent::AgentControl; use crate::agent::AgentStatus; use crate::agent::MAX_THREAD_SPAWN_DEPTH; use crate::agent::agent_status_from_event; +use crate::analytics_client::AnalyticsEventsClient; +use crate::analytics_client::build_track_events_context; use crate::compact; use crate::compact::run_inline_auto_compact_task; use crate::compact::should_use_remote_compact_task; @@ -904,6 +906,10 @@ impl Session { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::default(), + analytics_events_client: AnalyticsEventsClient::new( + Arc::clone(&config), + Arc::clone(&auth_manager), + ), notifier: UserNotifier::new(config.notify.clone()), rollout: Mutex::new(rollout_recorder), user_shell: Arc::new(default_shell), @@ -3385,10 +3391,18 @@ pub(crate) async fn run_turn( .await; let otel_manager = turn_context.client.get_otel_manager(); + let thread_id = sess.conversation_id.to_string(); + let tracking = build_track_events_context(turn_context.client.get_model(), thread_id); let SkillInjections { items: skill_items, warnings: skill_warnings, - } = build_skill_injections(&mentioned_skills, Some(&otel_manager)).await; + } = build_skill_injections( + &mentioned_skills, + Some(&otel_manager), + &sess.services.analytics_events_client, + tracking.clone(), + ) + .await; for message in skill_warnings { sess.send_event(&turn_context, EventMsg::Warning(WarningEvent { message })) @@ -5347,6 +5361,10 @@ mod tests { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::default(), + analytics_events_client: AnalyticsEventsClient::new( + Arc::clone(&config), + Arc::clone(&auth_manager), + ), notifier: UserNotifier::new(None), rollout: Mutex::new(None), user_shell: Arc::new(default_user_shell()), @@ -5463,6 +5481,10 @@ mod tests { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::default(), + analytics_events_client: AnalyticsEventsClient::new( + Arc::clone(&config), + Arc::clone(&auth_manager), + ), notifier: UserNotifier::new(None), rollout: Mutex::new(None), user_shell: Arc::new(default_user_shell()), diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 75b53624f4..94ba2f26b0 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -5,6 +5,7 @@ // the TUI or the tracing stack). #![deny(clippy::print_stdout, clippy::print_stderr)] +mod analytics_client; pub mod api_bridge; mod apply_patch; pub mod auth; diff --git a/codex-rs/core/src/skills/injection.rs b/codex-rs/core/src/skills/injection.rs index 57de457e90..77b3ceea10 100644 --- a/codex-rs/core/src/skills/injection.rs +++ b/codex-rs/core/src/skills/injection.rs @@ -2,6 +2,9 @@ use std::collections::HashMap; use std::collections::HashSet; use std::path::PathBuf; +use crate::analytics_client::AnalyticsEventsClient; +use crate::analytics_client::SkillInvocation; +use crate::analytics_client::TrackEventsContext; use crate::instructions::SkillInstructions; use crate::skills::SkillMetadata; use codex_otel::OtelManager; @@ -18,6 +21,8 @@ pub(crate) struct SkillInjections { pub(crate) async fn build_skill_injections( mentioned_skills: &[SkillMetadata], otel: Option<&OtelManager>, + analytics_client: &AnalyticsEventsClient, + tracking: TrackEventsContext, ) -> SkillInjections { if mentioned_skills.is_empty() { return SkillInjections::default(); @@ -27,11 +32,17 @@ pub(crate) async fn build_skill_injections( items: Vec::with_capacity(mentioned_skills.len()), warnings: Vec::new(), }; + let mut invocations = Vec::new(); for skill in mentioned_skills { match fs::read_to_string(&skill.path).await { Ok(contents) => { emit_skill_injected_metric(otel, skill, "ok"); + invocations.push(SkillInvocation { + skill_name: skill.name.clone(), + skill_scope: skill.scope, + skill_path: skill.path.clone(), + }); result.items.push(ResponseItem::from(SkillInstructions { name: skill.name.clone(), path: skill.path.to_string_lossy().into_owned(), @@ -50,6 +61,8 @@ pub(crate) async fn build_skill_injections( } } + analytics_client.track_skill_invocations(tracking, invocations); + result } diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index e036b29b7d..d7788f71cb 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use crate::AuthManager; use crate::RolloutRecorder; use crate::agent::AgentControl; +use crate::analytics_client::AnalyticsEventsClient; use crate::exec_policy::ExecPolicyManager; use crate::mcp_connection_manager::McpConnectionManager; use crate::models_manager::manager::ModelsManager; @@ -21,6 +22,7 @@ pub(crate) struct SessionServices { pub(crate) mcp_connection_manager: Arc>, pub(crate) mcp_startup_cancellation_token: Mutex, pub(crate) unified_exec_manager: UnifiedExecProcessManager, + pub(crate) analytics_events_client: AnalyticsEventsClient, pub(crate) notifier: UserNotifier, pub(crate) rollout: Mutex>, pub(crate) user_shell: Arc,