mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
[codex-analytics] refactor analytics to use reducer architecture (#16225)
- rework codex analytics crate to use reducer / publish architecture - in anticipation of extensive codex analytics
This commit is contained in:
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -1337,6 +1337,7 @@ checksum = "e9b18233253483ce2f65329a24072ec414db782531bdbb7d0bbc4bd2ce6b7e21"
|
||||
name = "codex-analytics"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"codex-app-server-protocol",
|
||||
"codex-git-utils",
|
||||
"codex-login",
|
||||
"codex-plugin",
|
||||
|
||||
@@ -13,6 +13,7 @@ path = "src/lib.rs"
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-git-utils = { workspace = true }
|
||||
codex-login = { workspace = true }
|
||||
codex-plugin = { workspace = true }
|
||||
|
||||
@@ -1,3 +1,8 @@
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_git_utils::collect_git_info;
|
||||
use codex_git_utils::get_git_repo_root;
|
||||
use codex_login::AuthManager;
|
||||
@@ -56,9 +61,73 @@ pub struct AppInvocation {
|
||||
pub invocation_type: Option<InvocationType>,
|
||||
}
|
||||
|
||||
pub enum AnalyticsFact {
|
||||
Initialize {
|
||||
connection_id: u64,
|
||||
params: InitializeParams,
|
||||
},
|
||||
Request {
|
||||
connection_id: u64,
|
||||
request_id: RequestId,
|
||||
request: Box<ClientRequest>,
|
||||
},
|
||||
Response {
|
||||
connection_id: u64,
|
||||
response: Box<ClientResponse>,
|
||||
},
|
||||
Notification(Box<ServerNotification>),
|
||||
// Facts that do not naturally exist on the app-server protocol surface, or
|
||||
// would require non-trivial protocol reshaping on this branch.
|
||||
Custom(CustomAnalyticsFact),
|
||||
}
|
||||
|
||||
pub enum CustomAnalyticsFact {
|
||||
SkillInvoked(SkillInvokedInput),
|
||||
AppMentioned(AppMentionedInput),
|
||||
AppUsed(AppUsedInput),
|
||||
PluginUsed(PluginUsedInput),
|
||||
PluginStateChanged(PluginStateChangedInput),
|
||||
}
|
||||
|
||||
pub struct SkillInvokedInput {
|
||||
pub tracking: TrackEventsContext,
|
||||
pub invocations: Vec<SkillInvocation>,
|
||||
}
|
||||
|
||||
pub struct AppMentionedInput {
|
||||
pub tracking: TrackEventsContext,
|
||||
pub mentions: Vec<AppInvocation>,
|
||||
}
|
||||
|
||||
pub struct AppUsedInput {
|
||||
pub tracking: TrackEventsContext,
|
||||
pub app: AppInvocation,
|
||||
}
|
||||
|
||||
pub struct PluginUsedInput {
|
||||
pub tracking: TrackEventsContext,
|
||||
pub plugin: PluginTelemetryMetadata,
|
||||
}
|
||||
|
||||
pub struct PluginStateChangedInput {
|
||||
pub plugin: PluginTelemetryMetadata,
|
||||
pub state: PluginState,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum PluginState {
|
||||
Installed,
|
||||
Uninstalled,
|
||||
Enabled,
|
||||
Disabled,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct AnalyticsReducer;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct AnalyticsEventsQueue {
|
||||
sender: mpsc::Sender<TrackEventsJob>,
|
||||
sender: mpsc::Sender<AnalyticsFact>,
|
||||
app_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
|
||||
plugin_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
|
||||
}
|
||||
@@ -73,33 +142,11 @@ impl AnalyticsEventsQueue {
|
||||
pub(crate) fn new(auth_manager: Arc<AuthManager>, base_url: String) -> Self {
|
||||
let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE);
|
||||
tokio::spawn(async move {
|
||||
while let Some(job) = receiver.recv().await {
|
||||
match job {
|
||||
TrackEventsJob::SkillInvocations(job) => {
|
||||
send_track_skill_invocations(&auth_manager, &base_url, job).await;
|
||||
}
|
||||
TrackEventsJob::AppMentioned(job) => {
|
||||
send_track_app_mentioned(&auth_manager, &base_url, job).await;
|
||||
}
|
||||
TrackEventsJob::AppUsed(job) => {
|
||||
send_track_app_used(&auth_manager, &base_url, job).await;
|
||||
}
|
||||
TrackEventsJob::PluginUsed(job) => {
|
||||
send_track_plugin_used(&auth_manager, &base_url, job).await;
|
||||
}
|
||||
TrackEventsJob::PluginInstalled(job) => {
|
||||
send_track_plugin_installed(&auth_manager, &base_url, job).await;
|
||||
}
|
||||
TrackEventsJob::PluginUninstalled(job) => {
|
||||
send_track_plugin_uninstalled(&auth_manager, &base_url, job).await;
|
||||
}
|
||||
TrackEventsJob::PluginEnabled(job) => {
|
||||
send_track_plugin_enabled(&auth_manager, &base_url, job).await;
|
||||
}
|
||||
TrackEventsJob::PluginDisabled(job) => {
|
||||
send_track_plugin_disabled(&auth_manager, &base_url, job).await;
|
||||
}
|
||||
}
|
||||
let mut reducer = AnalyticsReducer;
|
||||
while let Some(input) = receiver.recv().await {
|
||||
let mut events = Vec::new();
|
||||
reducer.ingest(input, &mut events).await;
|
||||
send_track_events(&auth_manager, &base_url, events).await;
|
||||
}
|
||||
});
|
||||
Self {
|
||||
@@ -109,8 +156,8 @@ impl AnalyticsEventsQueue {
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send(&self, job: TrackEventsJob) {
|
||||
if self.sender.try_send(job).is_err() {
|
||||
fn try_send(&self, input: AnalyticsFact) {
|
||||
if self.sender.try_send(input).is_err() {
|
||||
//TODO: add a metric for this
|
||||
tracing::warn!("dropping analytics events: queue is full");
|
||||
}
|
||||
@@ -163,114 +210,86 @@ impl AnalyticsEventsClient {
|
||||
tracking: TrackEventsContext,
|
||||
invocations: Vec<SkillInvocation>,
|
||||
) {
|
||||
track_skill_invocations(
|
||||
&self.queue,
|
||||
self.analytics_enabled,
|
||||
Some(tracking),
|
||||
invocations,
|
||||
);
|
||||
if invocations.is_empty() {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked(
|
||||
SkillInvokedInput {
|
||||
tracking,
|
||||
invocations,
|
||||
},
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_app_mentioned(&self, tracking: TrackEventsContext, mentions: Vec<AppInvocation>) {
|
||||
track_app_mentioned(
|
||||
&self.queue,
|
||||
self.analytics_enabled,
|
||||
Some(tracking),
|
||||
mentions,
|
||||
);
|
||||
if mentions.is_empty() {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppMentioned(
|
||||
AppMentionedInput { tracking, mentions },
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
|
||||
track_app_used(&self.queue, self.analytics_enabled, Some(tracking), app);
|
||||
if !self.queue.should_enqueue_app_used(&tracking, &app) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
|
||||
AppUsedInput { tracking, app },
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
|
||||
track_plugin_used(&self.queue, self.analytics_enabled, Some(tracking), plugin);
|
||||
if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
|
||||
PluginUsedInput { tracking, plugin },
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
|
||||
track_plugin_management(
|
||||
&self.queue,
|
||||
self.analytics_enabled,
|
||||
PluginManagementEventType::Installed,
|
||||
plugin,
|
||||
);
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
plugin,
|
||||
state: PluginState::Installed,
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_plugin_uninstalled(&self, plugin: PluginTelemetryMetadata) {
|
||||
track_plugin_management(
|
||||
&self.queue,
|
||||
self.analytics_enabled,
|
||||
PluginManagementEventType::Uninstalled,
|
||||
plugin,
|
||||
);
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
plugin,
|
||||
state: PluginState::Uninstalled,
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_plugin_enabled(&self, plugin: PluginTelemetryMetadata) {
|
||||
track_plugin_management(
|
||||
&self.queue,
|
||||
self.analytics_enabled,
|
||||
PluginManagementEventType::Enabled,
|
||||
plugin,
|
||||
);
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
plugin,
|
||||
state: PluginState::Enabled,
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_plugin_disabled(&self, plugin: PluginTelemetryMetadata) {
|
||||
track_plugin_management(
|
||||
&self.queue,
|
||||
self.analytics_enabled,
|
||||
PluginManagementEventType::Disabled,
|
||||
plugin,
|
||||
);
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
plugin,
|
||||
state: PluginState::Disabled,
|
||||
}),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
enum TrackEventsJob {
|
||||
SkillInvocations(TrackSkillInvocationsJob),
|
||||
AppMentioned(TrackAppMentionedJob),
|
||||
AppUsed(TrackAppUsedJob),
|
||||
PluginUsed(TrackPluginUsedJob),
|
||||
PluginInstalled(TrackPluginManagementJob),
|
||||
PluginUninstalled(TrackPluginManagementJob),
|
||||
PluginEnabled(TrackPluginManagementJob),
|
||||
PluginDisabled(TrackPluginManagementJob),
|
||||
}
|
||||
|
||||
struct TrackSkillInvocationsJob {
|
||||
analytics_enabled: Option<bool>,
|
||||
tracking: TrackEventsContext,
|
||||
invocations: Vec<SkillInvocation>,
|
||||
}
|
||||
|
||||
struct TrackAppMentionedJob {
|
||||
analytics_enabled: Option<bool>,
|
||||
tracking: TrackEventsContext,
|
||||
mentions: Vec<AppInvocation>,
|
||||
}
|
||||
|
||||
struct TrackAppUsedJob {
|
||||
analytics_enabled: Option<bool>,
|
||||
tracking: TrackEventsContext,
|
||||
app: AppInvocation,
|
||||
}
|
||||
|
||||
struct TrackPluginUsedJob {
|
||||
analytics_enabled: Option<bool>,
|
||||
tracking: TrackEventsContext,
|
||||
plugin: PluginTelemetryMetadata,
|
||||
}
|
||||
|
||||
struct TrackPluginManagementJob {
|
||||
analytics_enabled: Option<bool>,
|
||||
plugin: PluginTelemetryMetadata,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum PluginManagementEventType {
|
||||
Installed,
|
||||
Uninstalled,
|
||||
Enabled,
|
||||
Disabled,
|
||||
fn record_fact(&self, input: AnalyticsFact) {
|
||||
if self.analytics_enabled == Some(false) {
|
||||
return;
|
||||
}
|
||||
self.queue.try_send(input);
|
||||
}
|
||||
}
|
||||
|
||||
const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256;
|
||||
@@ -368,286 +387,145 @@ struct CodexPluginUsedEventRequest {
|
||||
event_params: CodexPluginUsedMetadata,
|
||||
}
|
||||
|
||||
pub(crate) fn track_skill_invocations(
|
||||
queue: &AnalyticsEventsQueue,
|
||||
analytics_enabled: Option<bool>,
|
||||
tracking: Option<TrackEventsContext>,
|
||||
invocations: Vec<SkillInvocation>,
|
||||
) {
|
||||
if analytics_enabled == Some(false) {
|
||||
return;
|
||||
}
|
||||
let Some(tracking) = tracking else {
|
||||
return;
|
||||
};
|
||||
if invocations.is_empty() {
|
||||
return;
|
||||
}
|
||||
let job = TrackEventsJob::SkillInvocations(TrackSkillInvocationsJob {
|
||||
analytics_enabled,
|
||||
tracking,
|
||||
invocations,
|
||||
});
|
||||
queue.try_send(job);
|
||||
}
|
||||
|
||||
pub(crate) fn track_app_mentioned(
|
||||
queue: &AnalyticsEventsQueue,
|
||||
analytics_enabled: Option<bool>,
|
||||
tracking: Option<TrackEventsContext>,
|
||||
mentions: Vec<AppInvocation>,
|
||||
) {
|
||||
if analytics_enabled == Some(false) {
|
||||
return;
|
||||
}
|
||||
let Some(tracking) = tracking else {
|
||||
return;
|
||||
};
|
||||
if mentions.is_empty() {
|
||||
return;
|
||||
}
|
||||
let job = TrackEventsJob::AppMentioned(TrackAppMentionedJob {
|
||||
analytics_enabled,
|
||||
tracking,
|
||||
mentions,
|
||||
});
|
||||
queue.try_send(job);
|
||||
}
|
||||
|
||||
pub(crate) fn track_app_used(
|
||||
queue: &AnalyticsEventsQueue,
|
||||
analytics_enabled: Option<bool>,
|
||||
tracking: Option<TrackEventsContext>,
|
||||
app: AppInvocation,
|
||||
) {
|
||||
if analytics_enabled == Some(false) {
|
||||
return;
|
||||
}
|
||||
let Some(tracking) = tracking else {
|
||||
return;
|
||||
};
|
||||
if !queue.should_enqueue_app_used(&tracking, &app) {
|
||||
return;
|
||||
}
|
||||
let job = TrackEventsJob::AppUsed(TrackAppUsedJob {
|
||||
analytics_enabled,
|
||||
tracking,
|
||||
app,
|
||||
});
|
||||
queue.try_send(job);
|
||||
}
|
||||
|
||||
pub(crate) fn track_plugin_used(
|
||||
queue: &AnalyticsEventsQueue,
|
||||
analytics_enabled: Option<bool>,
|
||||
tracking: Option<TrackEventsContext>,
|
||||
plugin: PluginTelemetryMetadata,
|
||||
) {
|
||||
if analytics_enabled == Some(false) {
|
||||
return;
|
||||
}
|
||||
let Some(tracking) = tracking else {
|
||||
return;
|
||||
};
|
||||
if !queue.should_enqueue_plugin_used(&tracking, &plugin) {
|
||||
return;
|
||||
}
|
||||
let job = TrackEventsJob::PluginUsed(TrackPluginUsedJob {
|
||||
analytics_enabled,
|
||||
tracking,
|
||||
plugin,
|
||||
});
|
||||
queue.try_send(job);
|
||||
}
|
||||
|
||||
fn track_plugin_management(
|
||||
queue: &AnalyticsEventsQueue,
|
||||
analytics_enabled: Option<bool>,
|
||||
event_type: PluginManagementEventType,
|
||||
plugin: PluginTelemetryMetadata,
|
||||
) {
|
||||
if analytics_enabled == Some(false) {
|
||||
return;
|
||||
}
|
||||
let job = TrackPluginManagementJob {
|
||||
analytics_enabled,
|
||||
plugin,
|
||||
};
|
||||
let job = match event_type {
|
||||
PluginManagementEventType::Installed => TrackEventsJob::PluginInstalled(job),
|
||||
PluginManagementEventType::Uninstalled => TrackEventsJob::PluginUninstalled(job),
|
||||
PluginManagementEventType::Enabled => TrackEventsJob::PluginEnabled(job),
|
||||
PluginManagementEventType::Disabled => TrackEventsJob::PluginDisabled(job),
|
||||
};
|
||||
queue.try_send(job);
|
||||
}
|
||||
|
||||
async fn send_track_skill_invocations(
|
||||
auth_manager: &AuthManager,
|
||||
base_url: &str,
|
||||
job: TrackSkillInvocationsJob,
|
||||
) {
|
||||
let TrackSkillInvocationsJob {
|
||||
analytics_enabled,
|
||||
tracking,
|
||||
invocations,
|
||||
} = job;
|
||||
let mut events = Vec::with_capacity(invocations.len());
|
||||
for invocation in invocations {
|
||||
let skill_scope = match invocation.skill_scope {
|
||||
SkillScope::User => "user",
|
||||
SkillScope::Repo => "repo",
|
||||
SkillScope::System => "system",
|
||||
SkillScope::Admin => "admin",
|
||||
};
|
||||
let repo_root = get_git_repo_root(invocation.skill_path.as_path());
|
||||
let repo_url = if let Some(root) = repo_root.as_ref() {
|
||||
collect_git_info(root)
|
||||
.await
|
||||
.and_then(|info| info.repository_url)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let skill_id = skill_id_for_local_skill(
|
||||
repo_url.as_deref(),
|
||||
repo_root.as_deref(),
|
||||
invocation.skill_path.as_path(),
|
||||
invocation.skill_name.as_str(),
|
||||
);
|
||||
events.push(TrackEventRequest::SkillInvocation(
|
||||
SkillInvocationEventRequest {
|
||||
event_type: "skill_invocation",
|
||||
skill_id,
|
||||
skill_name: invocation.skill_name.clone(),
|
||||
event_params: SkillInvocationEventParams {
|
||||
thread_id: Some(tracking.thread_id.clone()),
|
||||
invoke_type: Some(invocation.invocation_type),
|
||||
model_slug: Some(tracking.model_slug.clone()),
|
||||
product_client_id: Some(originator().value),
|
||||
repo_url,
|
||||
skill_scope: Some(skill_scope.to_string()),
|
||||
},
|
||||
impl AnalyticsReducer {
|
||||
async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec<TrackEventRequest>) {
|
||||
match input {
|
||||
AnalyticsFact::Initialize {
|
||||
connection_id: _connection_id,
|
||||
params: _params,
|
||||
} => {}
|
||||
AnalyticsFact::Request {
|
||||
connection_id: _connection_id,
|
||||
request_id: _request_id,
|
||||
request: _request,
|
||||
} => {}
|
||||
AnalyticsFact::Response {
|
||||
connection_id: _connection_id,
|
||||
response: _response,
|
||||
} => {}
|
||||
AnalyticsFact::Notification(_notification) => {}
|
||||
AnalyticsFact::Custom(input) => match input {
|
||||
CustomAnalyticsFact::SkillInvoked(input) => {
|
||||
self.ingest_skill_invoked(input, out).await;
|
||||
}
|
||||
CustomAnalyticsFact::AppMentioned(input) => {
|
||||
self.ingest_app_mentioned(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::AppUsed(input) => {
|
||||
self.ingest_app_used(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::PluginUsed(input) => {
|
||||
self.ingest_plugin_used(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::PluginStateChanged(input) => {
|
||||
self.ingest_plugin_state_changed(input, out);
|
||||
}
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
send_track_events(auth_manager, analytics_enabled, base_url, events).await;
|
||||
}
|
||||
async fn ingest_skill_invoked(
|
||||
&mut self,
|
||||
input: SkillInvokedInput,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let SkillInvokedInput {
|
||||
tracking,
|
||||
invocations,
|
||||
} = input;
|
||||
for invocation in invocations {
|
||||
let skill_scope = match invocation.skill_scope {
|
||||
SkillScope::User => "user",
|
||||
SkillScope::Repo => "repo",
|
||||
SkillScope::System => "system",
|
||||
SkillScope::Admin => "admin",
|
||||
};
|
||||
let repo_root = get_git_repo_root(invocation.skill_path.as_path());
|
||||
let repo_url = if let Some(root) = repo_root.as_ref() {
|
||||
collect_git_info(root)
|
||||
.await
|
||||
.and_then(|info| info.repository_url)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let skill_id = skill_id_for_local_skill(
|
||||
repo_url.as_deref(),
|
||||
repo_root.as_deref(),
|
||||
invocation.skill_path.as_path(),
|
||||
invocation.skill_name.as_str(),
|
||||
);
|
||||
out.push(TrackEventRequest::SkillInvocation(
|
||||
SkillInvocationEventRequest {
|
||||
event_type: "skill_invocation",
|
||||
skill_id,
|
||||
skill_name: invocation.skill_name.clone(),
|
||||
event_params: SkillInvocationEventParams {
|
||||
thread_id: Some(tracking.thread_id.clone()),
|
||||
invoke_type: Some(invocation.invocation_type),
|
||||
model_slug: Some(tracking.model_slug.clone()),
|
||||
product_client_id: Some(originator().value),
|
||||
repo_url,
|
||||
skill_scope: Some(skill_scope.to_string()),
|
||||
},
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_track_app_mentioned(
|
||||
auth_manager: &AuthManager,
|
||||
base_url: &str,
|
||||
job: TrackAppMentionedJob,
|
||||
) {
|
||||
let TrackAppMentionedJob {
|
||||
analytics_enabled,
|
||||
tracking,
|
||||
mentions,
|
||||
} = job;
|
||||
let events = mentions
|
||||
.into_iter()
|
||||
.map(|mention| {
|
||||
fn ingest_app_mentioned(&mut self, input: AppMentionedInput, out: &mut Vec<TrackEventRequest>) {
|
||||
let AppMentionedInput { tracking, mentions } = input;
|
||||
out.extend(mentions.into_iter().map(|mention| {
|
||||
let event_params = codex_app_metadata(&tracking, mention);
|
||||
TrackEventRequest::AppMentioned(CodexAppMentionedEventRequest {
|
||||
event_type: "codex_app_mentioned",
|
||||
event_params,
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
}));
|
||||
}
|
||||
|
||||
send_track_events(auth_manager, analytics_enabled, base_url, events).await;
|
||||
fn ingest_app_used(&mut self, input: AppUsedInput, out: &mut Vec<TrackEventRequest>) {
|
||||
let AppUsedInput { tracking, app } = input;
|
||||
let event_params = codex_app_metadata(&tracking, app);
|
||||
out.push(TrackEventRequest::AppUsed(CodexAppUsedEventRequest {
|
||||
event_type: "codex_app_used",
|
||||
event_params,
|
||||
}));
|
||||
}
|
||||
|
||||
fn ingest_plugin_used(&mut self, input: PluginUsedInput, out: &mut Vec<TrackEventRequest>) {
|
||||
let PluginUsedInput { tracking, plugin } = input;
|
||||
out.push(TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest {
|
||||
event_type: "codex_plugin_used",
|
||||
event_params: codex_plugin_used_metadata(&tracking, plugin),
|
||||
}));
|
||||
}
|
||||
|
||||
fn ingest_plugin_state_changed(
|
||||
&mut self,
|
||||
input: PluginStateChangedInput,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let PluginStateChangedInput { plugin, state } = input;
|
||||
let event = CodexPluginEventRequest {
|
||||
event_type: plugin_state_event_type(state),
|
||||
event_params: codex_plugin_metadata(plugin),
|
||||
};
|
||||
out.push(match state {
|
||||
PluginState::Installed => TrackEventRequest::PluginInstalled(event),
|
||||
PluginState::Uninstalled => TrackEventRequest::PluginUninstalled(event),
|
||||
PluginState::Enabled => TrackEventRequest::PluginEnabled(event),
|
||||
PluginState::Disabled => TrackEventRequest::PluginDisabled(event),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_track_app_used(auth_manager: &AuthManager, base_url: &str, job: TrackAppUsedJob) {
|
||||
let TrackAppUsedJob {
|
||||
analytics_enabled,
|
||||
tracking,
|
||||
app,
|
||||
} = job;
|
||||
let event_params = codex_app_metadata(&tracking, app);
|
||||
let events = vec![TrackEventRequest::AppUsed(CodexAppUsedEventRequest {
|
||||
event_type: "codex_app_used",
|
||||
event_params,
|
||||
})];
|
||||
|
||||
send_track_events(auth_manager, analytics_enabled, base_url, events).await;
|
||||
}
|
||||
|
||||
async fn send_track_plugin_used(
|
||||
auth_manager: &AuthManager,
|
||||
base_url: &str,
|
||||
job: TrackPluginUsedJob,
|
||||
) {
|
||||
let TrackPluginUsedJob {
|
||||
analytics_enabled,
|
||||
tracking,
|
||||
plugin,
|
||||
} = job;
|
||||
let events = vec![TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest {
|
||||
event_type: "codex_plugin_used",
|
||||
event_params: codex_plugin_used_metadata(&tracking, plugin),
|
||||
})];
|
||||
|
||||
send_track_events(auth_manager, analytics_enabled, base_url, events).await;
|
||||
}
|
||||
|
||||
async fn send_track_plugin_installed(
|
||||
auth_manager: &AuthManager,
|
||||
base_url: &str,
|
||||
job: TrackPluginManagementJob,
|
||||
) {
|
||||
send_track_plugin_management_event(auth_manager, base_url, job, "codex_plugin_installed").await;
|
||||
}
|
||||
|
||||
async fn send_track_plugin_uninstalled(
|
||||
auth_manager: &AuthManager,
|
||||
base_url: &str,
|
||||
job: TrackPluginManagementJob,
|
||||
) {
|
||||
send_track_plugin_management_event(auth_manager, base_url, job, "codex_plugin_uninstalled")
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn send_track_plugin_enabled(
|
||||
auth_manager: &AuthManager,
|
||||
base_url: &str,
|
||||
job: TrackPluginManagementJob,
|
||||
) {
|
||||
send_track_plugin_management_event(auth_manager, base_url, job, "codex_plugin_enabled").await;
|
||||
}
|
||||
|
||||
async fn send_track_plugin_disabled(
|
||||
auth_manager: &AuthManager,
|
||||
base_url: &str,
|
||||
job: TrackPluginManagementJob,
|
||||
) {
|
||||
send_track_plugin_management_event(auth_manager, base_url, job, "codex_plugin_disabled").await;
|
||||
}
|
||||
|
||||
async fn send_track_plugin_management_event(
|
||||
auth_manager: &AuthManager,
|
||||
base_url: &str,
|
||||
job: TrackPluginManagementJob,
|
||||
event_type: &'static str,
|
||||
) {
|
||||
let TrackPluginManagementJob {
|
||||
analytics_enabled,
|
||||
plugin,
|
||||
} = job;
|
||||
let event_params = codex_plugin_metadata(plugin);
|
||||
let event = CodexPluginEventRequest {
|
||||
event_type,
|
||||
event_params,
|
||||
};
|
||||
let events = vec![match event_type {
|
||||
"codex_plugin_installed" => TrackEventRequest::PluginInstalled(event),
|
||||
"codex_plugin_uninstalled" => TrackEventRequest::PluginUninstalled(event),
|
||||
"codex_plugin_enabled" => TrackEventRequest::PluginEnabled(event),
|
||||
"codex_plugin_disabled" => TrackEventRequest::PluginDisabled(event),
|
||||
_ => unreachable!("unknown plugin management event type"),
|
||||
}];
|
||||
|
||||
send_track_events(auth_manager, analytics_enabled, base_url, events).await;
|
||||
fn plugin_state_event_type(state: PluginState) -> &'static str {
|
||||
match state {
|
||||
PluginState::Installed => "codex_plugin_installed",
|
||||
PluginState::Uninstalled => "codex_plugin_uninstalled",
|
||||
PluginState::Enabled => "codex_plugin_enabled",
|
||||
PluginState::Disabled => "codex_plugin_disabled",
|
||||
}
|
||||
}
|
||||
|
||||
fn codex_app_metadata(tracking: &TrackEventsContext, app: AppInvocation) -> CodexAppMetadata {
|
||||
@@ -699,13 +577,9 @@ fn codex_plugin_used_metadata(
|
||||
|
||||
async fn send_track_events(
|
||||
auth_manager: &AuthManager,
|
||||
analytics_enabled: Option<bool>,
|
||||
base_url: &str,
|
||||
events: Vec<TrackEventRequest>,
|
||||
) {
|
||||
if analytics_enabled == Some(false) {
|
||||
return;
|
||||
}
|
||||
if events.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1,10 +1,20 @@
|
||||
use super::AnalyticsEventsQueue;
|
||||
use super::AnalyticsFact;
|
||||
use super::AnalyticsReducer;
|
||||
use super::AppInvocation;
|
||||
use super::AppMentionedInput;
|
||||
use super::AppUsedInput;
|
||||
use super::CodexAppMentionedEventRequest;
|
||||
use super::CodexAppUsedEventRequest;
|
||||
use super::CodexPluginEventRequest;
|
||||
use super::CodexPluginUsedEventRequest;
|
||||
use super::CustomAnalyticsFact;
|
||||
use super::InvocationType;
|
||||
use super::PluginState;
|
||||
use super::PluginStateChangedInput;
|
||||
use super::PluginUsedInput;
|
||||
use super::SkillInvocation;
|
||||
use super::SkillInvokedInput;
|
||||
use super::TrackEventRequest;
|
||||
use super::TrackEventsContext;
|
||||
use super::codex_app_metadata;
|
||||
@@ -280,6 +290,145 @@ fn plugin_used_dedupe_is_keyed_by_turn_and_plugin() {
|
||||
assert_eq!(queue.should_enqueue_plugin_used(&turn_2, &plugin), true);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reducer_ingests_skill_invoked_fact() {
|
||||
let mut reducer = AnalyticsReducer;
|
||||
let mut events = Vec::new();
|
||||
let tracking = TrackEventsContext {
|
||||
model_slug: "gpt-5".to_string(),
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
};
|
||||
let skill_path = PathBuf::from("/Users/abc/.codex/skills/doc/SKILL.md");
|
||||
let expected_skill_id = super::skill_id_for_local_skill(
|
||||
/*repo_url*/ None,
|
||||
/*repo_root*/ None,
|
||||
skill_path.as_path(),
|
||||
"doc",
|
||||
);
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked(SkillInvokedInput {
|
||||
tracking,
|
||||
invocations: vec![SkillInvocation {
|
||||
skill_name: "doc".to_string(),
|
||||
skill_scope: codex_protocol::protocol::SkillScope::User,
|
||||
skill_path,
|
||||
invocation_type: InvocationType::Explicit,
|
||||
}],
|
||||
})),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!([{
|
||||
"event_type": "skill_invocation",
|
||||
"skill_id": expected_skill_id,
|
||||
"skill_name": "doc",
|
||||
"event_params": {
|
||||
"product_client_id": originator().value,
|
||||
"skill_scope": "user",
|
||||
"repo_url": null,
|
||||
"thread_id": "thread-1",
|
||||
"invoke_type": "explicit",
|
||||
"model_slug": "gpt-5"
|
||||
}
|
||||
}])
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reducer_ingests_app_and_plugin_facts() {
|
||||
let mut reducer = AnalyticsReducer;
|
||||
let mut events = Vec::new();
|
||||
let tracking = TrackEventsContext {
|
||||
model_slug: "gpt-5".to_string(),
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
};
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::AppMentioned(AppMentionedInput {
|
||||
tracking: tracking.clone(),
|
||||
mentions: vec![AppInvocation {
|
||||
connector_id: Some("calendar".to_string()),
|
||||
app_name: Some("Calendar".to_string()),
|
||||
invocation_type: Some(InvocationType::Explicit),
|
||||
}],
|
||||
})),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(AppUsedInput {
|
||||
tracking: tracking.clone(),
|
||||
app: AppInvocation {
|
||||
connector_id: Some("drive".to_string()),
|
||||
app_name: Some("Drive".to_string()),
|
||||
invocation_type: Some(InvocationType::Implicit),
|
||||
},
|
||||
})),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(PluginUsedInput {
|
||||
tracking,
|
||||
plugin: sample_plugin_metadata(),
|
||||
})),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(payload.as_array().expect("events array").len(), 3);
|
||||
assert_eq!(payload[0]["event_type"], "codex_app_mentioned");
|
||||
assert_eq!(payload[1]["event_type"], "codex_app_used");
|
||||
assert_eq!(payload[2]["event_type"], "codex_plugin_used");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reducer_ingests_plugin_state_changed_fact() {
|
||||
let mut reducer = AnalyticsReducer;
|
||||
let mut events = Vec::new();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::PluginStateChanged(
|
||||
PluginStateChangedInput {
|
||||
plugin: sample_plugin_metadata(),
|
||||
state: PluginState::Disabled,
|
||||
},
|
||||
)),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!([{
|
||||
"event_type": "codex_plugin_disabled",
|
||||
"event_params": {
|
||||
"plugin_id": "sample@test",
|
||||
"plugin_name": "sample",
|
||||
"marketplace_name": "test",
|
||||
"has_skills": true,
|
||||
"mcp_server_count": 2,
|
||||
"connector_ids": ["calendar", "drive"],
|
||||
"product_client_id": originator().value
|
||||
}
|
||||
}])
|
||||
);
|
||||
}
|
||||
|
||||
fn sample_plugin_metadata() -> PluginTelemetryMetadata {
|
||||
PluginTelemetryMetadata {
|
||||
plugin_id: PluginId::parse("sample@test").expect("valid plugin id"),
|
||||
|
||||
@@ -1,8 +1,17 @@
|
||||
mod analytics_client;
|
||||
|
||||
pub use analytics_client::AnalyticsEventsClient;
|
||||
pub use analytics_client::AnalyticsFact;
|
||||
pub use analytics_client::AnalyticsReducer;
|
||||
pub use analytics_client::AppInvocation;
|
||||
pub use analytics_client::AppMentionedInput;
|
||||
pub use analytics_client::AppUsedInput;
|
||||
pub use analytics_client::CustomAnalyticsFact;
|
||||
pub use analytics_client::InvocationType;
|
||||
pub use analytics_client::PluginState;
|
||||
pub use analytics_client::PluginStateChangedInput;
|
||||
pub use analytics_client::PluginUsedInput;
|
||||
pub use analytics_client::SkillInvocation;
|
||||
pub use analytics_client::SkillInvokedInput;
|
||||
pub use analytics_client::TrackEventsContext;
|
||||
pub use analytics_client::build_track_events_context;
|
||||
|
||||
Reference in New Issue
Block a user