Add plugin usage telemetry

This commit is contained in:
alexsong-oai
2026-03-12 16:49:34 -07:00
parent 1ea69e8d50
commit 262ecd1699
13 changed files with 1053 additions and 4 deletions

View File

@@ -12,17 +12,24 @@ use codex_app_server_protocol::ConfigWriteResponse;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::NetworkRequirements;
use codex_app_server_protocol::SandboxMode;
use codex_core::AnalyticsEventsClient;
use codex_core::ThreadManager;
use codex_core::config::CONFIG_TOML_FILE;
use codex_core::config::ConfigService;
use codex_core::config::ConfigServiceError;
use codex_core::config::ConfigToml;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::ConfigRequirementsToml;
use codex_core::config_loader::LoaderOverrides;
use codex_core::config_loader::ResidencyRequirement as CoreResidencyRequirement;
use codex_core::config_loader::SandboxModeRequirement as CoreSandboxModeRequirement;
use codex_core::plugins::PluginId;
use codex_core::plugins::installed_plugin_telemetry_metadata;
use codex_protocol::config_types::WebSearchMode;
use codex_protocol::protocol::Op;
use serde_json::Value as JsonValue;
use serde_json::json;
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
@@ -56,6 +63,7 @@ pub(crate) struct ConfigApi {
loader_overrides: LoaderOverrides,
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
user_config_reloader: Arc<dyn UserConfigReloader>,
analytics_events_client: AnalyticsEventsClient,
}
impl ConfigApi {
@@ -65,6 +73,7 @@ impl ConfigApi {
loader_overrides: LoaderOverrides,
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
user_config_reloader: Arc<dyn UserConfigReloader>,
analytics_events_client: AnalyticsEventsClient,
) -> Self {
Self {
codex_home,
@@ -72,6 +81,7 @@ impl ConfigApi {
loader_overrides,
cloud_requirements,
user_config_reloader,
analytics_events_client,
}
}
@@ -113,10 +123,17 @@ impl ConfigApi {
&self,
params: ConfigValueWriteParams,
) -> Result<ConfigWriteResponse, JSONRPCErrorError> {
self.config_service()
let target_path = config_write_target_path(&self.codex_home, params.file_path.as_deref());
let pending_changes =
collect_plugin_enabled_candidates([(&params.key_path, &params.value)].into_iter());
let previous_states = read_plugin_enabled_states(target_path.as_path(), &pending_changes);
let response = self
.config_service()
.write_value(params)
.await
.map_err(map_error)
.map_err(map_error)?;
self.emit_plugin_toggle_events(target_path.as_path(), previous_states, pending_changes);
Ok(response)
}
pub(crate) async fn batch_write(
@@ -124,16 +141,138 @@ impl ConfigApi {
params: ConfigBatchWriteParams,
) -> Result<ConfigWriteResponse, JSONRPCErrorError> {
let reload_user_config = params.reload_user_config;
let target_path = config_write_target_path(&self.codex_home, params.file_path.as_deref());
let pending_changes = collect_plugin_enabled_candidates(
params
.edits
.iter()
.map(|edit| (&edit.key_path, &edit.value)),
);
let previous_states = read_plugin_enabled_states(target_path.as_path(), &pending_changes);
let response = self
.config_service()
.batch_write(params)
.await
.map_err(map_error)?;
self.emit_plugin_toggle_events(target_path.as_path(), previous_states, pending_changes);
if reload_user_config {
self.user_config_reloader.reload_user_config().await;
}
Ok(response)
}
fn emit_plugin_toggle_events(
&self,
config_path: &std::path::Path,
previous_states: BTreeMap<String, Option<bool>>,
pending_changes: BTreeMap<String, bool>,
) {
if pending_changes.is_empty() {
return;
}
let updated_states = read_plugin_enabled_states(config_path, &pending_changes);
for (plugin_id, enabled) in
plugin_toggle_events_to_emit(&previous_states, &updated_states, pending_changes)
{
let Ok(plugin_id) = PluginId::parse(&plugin_id) else {
continue;
};
let metadata =
installed_plugin_telemetry_metadata(self.codex_home.as_path(), &plugin_id);
if enabled {
self.analytics_events_client.track_plugin_enabled(metadata);
} else {
self.analytics_events_client.track_plugin_disabled(metadata);
}
}
}
}
fn config_write_target_path(codex_home: &std::path::Path, file_path: Option<&str>) -> PathBuf {
file_path
.map(PathBuf::from)
.unwrap_or_else(|| codex_home.join(CONFIG_TOML_FILE))
}
fn collect_plugin_enabled_candidates<'a>(
edits: impl Iterator<Item = (&'a String, &'a JsonValue)>,
) -> BTreeMap<String, bool> {
let mut pending_changes = BTreeMap::new();
for (key_path, value) in edits {
let segments = key_path
.split('.')
.map(str::to_string)
.collect::<Vec<String>>();
match segments.as_slice() {
[plugins, plugin_id, enabled]
if plugins == "plugins" && enabled == "enabled" && value.is_boolean() =>
{
if let Some(enabled) = value.as_bool() {
pending_changes.insert(plugin_id.clone(), enabled);
}
}
[plugins, plugin_id] if plugins == "plugins" => {
if let Some(enabled) = value.get("enabled").and_then(JsonValue::as_bool) {
pending_changes.insert(plugin_id.clone(), enabled);
}
}
[plugins] if plugins == "plugins" => {
let Some(entries) = value.as_object() else {
continue;
};
for (plugin_id, plugin_value) in entries {
let Some(enabled) = plugin_value.get("enabled").and_then(JsonValue::as_bool)
else {
continue;
};
pending_changes.insert(plugin_id.clone(), enabled);
}
}
_ => {}
}
}
pending_changes
}
fn read_plugin_enabled_states(
config_path: &std::path::Path,
pending_changes: &BTreeMap<String, bool>,
) -> BTreeMap<String, Option<bool>> {
let parsed_config = std::fs::read_to_string(config_path)
.ok()
.and_then(|contents| toml::from_str::<ConfigToml>(&contents).ok());
pending_changes
.keys()
.map(|plugin_id| {
let enabled = parsed_config
.as_ref()
.and_then(|config| config.plugins.get(plugin_id))
.map(|plugin| plugin.enabled);
(plugin_id.clone(), enabled)
})
.collect()
}
fn plugin_toggle_events_to_emit(
previous_states: &BTreeMap<String, Option<bool>>,
updated_states: &BTreeMap<String, Option<bool>>,
pending_changes: BTreeMap<String, bool>,
) -> Vec<(String, bool)> {
pending_changes
.into_iter()
.filter_map(|(plugin_id, enabled)| {
let previous_enabled = previous_states.get(&plugin_id).copied().flatten();
let updated_enabled = updated_states.get(&plugin_id).copied().flatten();
if previous_enabled == updated_enabled || updated_enabled != Some(enabled) {
None
} else {
Some((plugin_id, enabled))
}
})
.collect()
}
fn map_requirements_toml_to_api(requirements: ConfigRequirementsToml) -> ConfigRequirements {
@@ -229,6 +368,7 @@ fn config_write_error(code: ConfigWriteErrorCode, message: impl Into<String>) ->
#[cfg(test)]
mod tests {
use super::*;
use codex_core::AnalyticsEventsClient;
use codex_core::config_loader::NetworkRequirementsToml as CoreNetworkRequirementsToml;
use codex_protocol::protocol::AskForApproval as CoreAskForApproval;
use pretty_assertions::assert_eq;
@@ -359,12 +499,24 @@ mod tests {
let user_config_path = codex_home.path().join("config.toml");
std::fs::write(&user_config_path, "").expect("write config");
let reloader = Arc::new(RecordingUserConfigReloader::default());
let analytics_config = Arc::new(
codex_core::config::ConfigBuilder::default()
.build()
.await
.expect("load analytics config"),
);
let config_api = ConfigApi::new(
codex_home.path().to_path_buf(),
Vec::new(),
LoaderOverrides::default(),
Arc::new(RwLock::new(CloudRequirementsLoader::default())),
reloader.clone(),
AnalyticsEventsClient::new(
analytics_config,
codex_core::test_support::auth_manager_from_auth(
codex_core::CodexAuth::from_api_key("test"),
),
),
);
let response = config_api
@@ -399,4 +551,115 @@ mod tests {
);
assert_eq!(reloader.call_count.load(Ordering::Relaxed), 1);
}
#[test]
fn collect_plugin_enabled_candidates_tracks_direct_and_table_writes() {
let candidates = collect_plugin_enabled_candidates(
[
(&"plugins.sample@test.enabled".to_string(), &json!(true)),
(
&"plugins.other@test".to_string(),
&json!({ "enabled": false, "ignored": true }),
),
(
&"plugins".to_string(),
&json!({
"nested@test": { "enabled": true },
"skip@test": { "name": "skip" },
}),
),
]
.into_iter(),
);
assert_eq!(
candidates,
BTreeMap::from([
("nested@test".to_string(), true),
("other@test".to_string(), false),
("sample@test".to_string(), true),
])
);
}
#[test]
fn collect_plugin_enabled_candidates_uses_last_write_for_same_plugin() {
let candidates = collect_plugin_enabled_candidates(
[
(&"plugins.sample@test.enabled".to_string(), &json!(true)),
(
&"plugins.sample@test".to_string(),
&json!({ "enabled": false }),
),
]
.into_iter(),
);
assert_eq!(
candidates,
BTreeMap::from([("sample@test".to_string(), false)])
);
}
#[test]
fn read_plugin_enabled_states_reads_plugin_table_values() {
let temp_dir = TempDir::new().expect("create temp dir");
let config_path = temp_dir.path().join("config.toml");
std::fs::write(
&config_path,
"[plugins.\"sample@test\"]\nenabled = true\n\n[plugins.\"other@test\"]\nenabled = false\n",
)
.expect("write config");
let states = read_plugin_enabled_states(
&config_path,
&BTreeMap::from([
("missing@test".to_string(), true),
("other@test".to_string(), false),
("sample@test".to_string(), true),
]),
);
assert_eq!(
states,
BTreeMap::from([
("missing@test".to_string(), None),
("other@test".to_string(), Some(false)),
("sample@test".to_string(), Some(true)),
])
);
}
#[test]
fn plugin_toggle_events_to_emit_only_reports_real_state_transitions() {
let previous_states = BTreeMap::from([
("disabled@test".to_string(), Some(false)),
("enabled@test".to_string(), Some(true)),
("missing@test".to_string(), None),
("stays_enabled@test".to_string(), Some(true)),
]);
let updated_states = BTreeMap::from([
("disabled@test".to_string(), Some(true)),
("enabled@test".to_string(), Some(false)),
("missing@test".to_string(), None),
("stays_enabled@test".to_string(), Some(true)),
]);
let pending_changes = BTreeMap::from([
("disabled@test".to_string(), true),
("enabled@test".to_string(), false),
("missing@test".to_string(), true),
("stays_enabled@test".to_string(), true),
]);
let events =
plugin_toggle_events_to_emit(&previous_states, &updated_states, pending_changes);
assert_eq!(
events,
vec![
("disabled@test".to_string(), true),
("enabled@test".to_string(), false),
]
);
}
}

View File

@@ -39,6 +39,7 @@ use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::experimental_required_message;
use codex_arg0::Arg0DispatchPaths;
use codex_core::AnalyticsEventsClient;
use codex_core::AuthManager;
use codex_core::ThreadManager;
use codex_core::auth::ExternalAuthRefreshContext;
@@ -206,6 +207,11 @@ impl MessageProcessor {
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(&config);
let cloud_requirements = Arc::new(RwLock::new(cloud_requirements));
let analytics_events_client =
AnalyticsEventsClient::new(Arc::clone(&config), Arc::clone(&auth_manager));
thread_manager
.plugins_manager()
.set_analytics_events_client(analytics_events_client.clone());
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
auth_manager,
thread_manager: Arc::clone(&thread_manager),
@@ -223,6 +229,7 @@ impl MessageProcessor {
loader_overrides,
cloud_requirements,
thread_manager,
analytics_events_client,
);
let external_agent_config_api = ExternalAgentConfigApi::new(config.codex_home.clone());

View File

@@ -0,0 +1,16 @@
use anyhow::Result;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
pub async fn start_analytics_events_server() -> Result<MockServer> {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/codex/analytics-events/events"))
.respond_with(ResponseTemplate::new(200))
.mount(&server)
.await;
Ok(server)
}

View File

@@ -1,3 +1,4 @@
mod analytics_server;
mod auth_fixtures;
mod config;
mod mcp_process;
@@ -6,6 +7,7 @@ mod models_cache;
mod responses;
mod rollout;
pub use analytics_server::start_analytics_events_server;
pub use auth_fixtures::ChatGptAuthFixture;
pub use auth_fixtures::ChatGptIdTokenClaims;
pub use auth_fixtures::encode_id_token;

View File

@@ -5,7 +5,9 @@ use std::time::Duration;
use anyhow::Result;
use app_test_support::ChatGptAuthFixture;
use app_test_support::DEFAULT_CLIENT_NAME;
use app_test_support::McpProcess;
use app_test_support::start_analytics_events_server;
use app_test_support::to_response;
use app_test_support::write_chatgpt_auth;
use axum::Json;
@@ -136,6 +138,85 @@ async fn plugin_install_returns_invalid_request_for_not_available_plugin() -> Re
Ok(())
}
#[tokio::test]
async fn plugin_install_tracks_analytics_event() -> Result<()> {
let analytics_server = start_analytics_events_server().await?;
let codex_home = TempDir::new()?;
write_analytics_config(codex_home.path(), &analytics_server.uri())?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
None,
None,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &[])?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
let payloads = timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = analytics_server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
if !requests.is_empty() {
break requests;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?;
let payload: serde_json::Value =
serde_json::from_slice(&payloads[0].body).expect("analytics payload");
assert_eq!(
payload,
json!({
"events": [{
"event_type": "codex_plugin_installed",
"event_params": {
"plugin_id": "sample-plugin@debug",
"plugin_name": "sample-plugin",
"marketplace_name": "debug",
"has_skills": false,
"mcp_server_count": 0,
"connector_ids": [],
"product_client_id": DEFAULT_CLIENT_NAME,
}
}]
})
);
Ok(())
}
#[tokio::test]
async fn plugin_install_returns_apps_needing_auth() -> Result<()> {
let connectors = vec![
@@ -461,6 +542,13 @@ connectors = true
)
}
fn write_analytics_config(codex_home: &std::path::Path, base_url: &str) -> std::io::Result<()> {
std::fs::write(
codex_home.join("config.toml"),
format!("chatgpt_base_url = \"{base_url}\"\n"),
)
}
fn write_plugin_marketplace(
repo_root: &std::path::Path,
marketplace_name: &str,

View File

@@ -1,13 +1,19 @@
use std::time::Duration;
use anyhow::Result;
use app_test_support::ChatGptAuthFixture;
use app_test_support::DEFAULT_CLIENT_NAME;
use app_test_support::McpProcess;
use app_test_support::start_analytics_events_server;
use app_test_support::to_response;
use app_test_support::write_chatgpt_auth;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::PluginUninstallParams;
use codex_app_server_protocol::PluginUninstallResponse;
use codex_app_server_protocol::RequestId;
use codex_core::auth::AuthCredentialsStoreMode;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -64,6 +70,78 @@ enabled = true
Ok(())
}
#[tokio::test]
async fn plugin_uninstall_tracks_analytics_event() -> Result<()> {
let analytics_server = start_analytics_events_server().await?;
let codex_home = TempDir::new()?;
write_installed_plugin(&codex_home, "debug", "sample-plugin")?;
std::fs::write(
codex_home.path().join("config.toml"),
format!(
"chatgpt_base_url = \"{}\"\n\n[features]\nplugins = true\n\n[plugins.\"sample-plugin@debug\"]\nenabled = true\n",
analytics_server.uri()
),
)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_uninstall_request(PluginUninstallParams {
plugin_id: "sample-plugin@debug".to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginUninstallResponse = to_response(response)?;
assert_eq!(response, PluginUninstallResponse {});
let payloads = timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = analytics_server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
if !requests.is_empty() {
break requests;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?;
let payload: serde_json::Value =
serde_json::from_slice(&payloads[0].body).expect("analytics payload");
assert_eq!(
payload,
json!({
"events": [{
"event_type": "codex_plugin_uninstalled",
"event_params": {
"plugin_id": "sample-plugin@debug",
"plugin_name": "sample-plugin",
"marketplace_name": "debug",
"has_skills": false,
"mcp_server_count": 0,
"connector_ids": [],
"product_client_id": DEFAULT_CLIENT_NAME,
}
}]
})
);
Ok(())
}
fn write_installed_plugin(
codex_home: &TempDir,
marketplace_name: &str,

View File

@@ -3,6 +3,7 @@ use crate::config::Config;
use crate::default_client::create_client;
use crate::git_info::collect_git_info;
use crate::git_info::get_git_repo_root;
use crate::plugins::PluginTelemetryMetadata;
use codex_protocol::protocol::SkillScope;
use serde::Serialize;
use sha1::Digest;
@@ -59,9 +60,11 @@ pub(crate) struct AppInvocation {
pub(crate) struct AnalyticsEventsQueue {
sender: mpsc::Sender<TrackEventsJob>,
app_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
plugin_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
}
pub(crate) struct AnalyticsEventsClient {
#[derive(Clone)]
pub struct AnalyticsEventsClient {
queue: AnalyticsEventsQueue,
config: Arc<Config>,
}
@@ -81,12 +84,28 @@ impl AnalyticsEventsQueue {
TrackEventsJob::AppUsed(job) => {
send_track_app_used(&auth_manager, job).await;
}
TrackEventsJob::PluginUsed(job) => {
send_track_plugin_used(&auth_manager, job).await;
}
TrackEventsJob::PluginInstalled(job) => {
send_track_plugin_installed(&auth_manager, job).await;
}
TrackEventsJob::PluginUninstalled(job) => {
send_track_plugin_uninstalled(&auth_manager, job).await;
}
TrackEventsJob::PluginEnabled(job) => {
send_track_plugin_enabled(&auth_manager, job).await;
}
TrackEventsJob::PluginDisabled(job) => {
send_track_plugin_disabled(&auth_manager, job).await;
}
}
}
});
Self {
sender,
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
}
}
@@ -110,10 +129,25 @@ impl AnalyticsEventsQueue {
}
emitted.insert((tracking.turn_id.clone(), connector_id.clone()))
}
fn should_enqueue_plugin_used(
&self,
tracking: &TrackEventsContext,
plugin: &PluginTelemetryMetadata,
) -> bool {
let mut emitted = self
.plugin_used_emitted_keys
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if emitted.len() >= ANALYTICS_APP_USED_DEDUPE_MAX_KEYS {
emitted.clear();
}
emitted.insert((tracking.turn_id.clone(), plugin.plugin_id.clone()))
}
}
impl AnalyticsEventsClient {
pub(crate) fn new(config: Arc<Config>, auth_manager: Arc<AuthManager>) -> Self {
pub fn new(config: Arc<Config>, auth_manager: Arc<AuthManager>) -> Self {
Self {
queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager)),
config,
@@ -149,12 +183,66 @@ impl AnalyticsEventsClient {
pub(crate) fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
track_app_used(&self.queue, Arc::clone(&self.config), Some(tracking), app);
}
pub(crate) fn track_plugin_used(
&self,
tracking: TrackEventsContext,
plugin: PluginTelemetryMetadata,
) {
track_plugin_used(
&self.queue,
Arc::clone(&self.config),
Some(tracking),
plugin,
);
}
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
track_plugin_management(
&self.queue,
Arc::clone(&self.config),
PluginManagementEventType::Installed,
plugin,
);
}
pub fn track_plugin_uninstalled(&self, plugin: PluginTelemetryMetadata) {
track_plugin_management(
&self.queue,
Arc::clone(&self.config),
PluginManagementEventType::Uninstalled,
plugin,
);
}
pub fn track_plugin_enabled(&self, plugin: PluginTelemetryMetadata) {
track_plugin_management(
&self.queue,
Arc::clone(&self.config),
PluginManagementEventType::Enabled,
plugin,
);
}
pub fn track_plugin_disabled(&self, plugin: PluginTelemetryMetadata) {
track_plugin_management(
&self.queue,
Arc::clone(&self.config),
PluginManagementEventType::Disabled,
plugin,
);
}
}
enum TrackEventsJob {
SkillInvocations(TrackSkillInvocationsJob),
AppMentioned(TrackAppMentionedJob),
AppUsed(TrackAppUsedJob),
PluginUsed(TrackPluginUsedJob),
PluginInstalled(TrackPluginManagementJob),
PluginUninstalled(TrackPluginManagementJob),
PluginEnabled(TrackPluginManagementJob),
PluginDisabled(TrackPluginManagementJob),
}
struct TrackSkillInvocationsJob {
@@ -175,6 +263,25 @@ struct TrackAppUsedJob {
app: AppInvocation,
}
struct TrackPluginUsedJob {
config: Arc<Config>,
tracking: TrackEventsContext,
plugin: PluginTelemetryMetadata,
}
struct TrackPluginManagementJob {
config: Arc<Config>,
plugin: PluginTelemetryMetadata,
}
#[derive(Clone, Copy)]
enum PluginManagementEventType {
Installed,
Uninstalled,
Enabled,
Disabled,
}
const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256;
const ANALYTICS_EVENTS_TIMEOUT: Duration = Duration::from_secs(10);
const ANALYTICS_APP_USED_DEDUPE_MAX_KEYS: usize = 4096;
@@ -190,6 +297,11 @@ enum TrackEventRequest {
SkillInvocation(SkillInvocationEventRequest),
AppMentioned(CodexAppMentionedEventRequest),
AppUsed(CodexAppUsedEventRequest),
PluginUsed(CodexPluginUsedEventRequest),
PluginInstalled(CodexPluginEventRequest),
PluginUninstalled(CodexPluginEventRequest),
PluginEnabled(CodexPluginEventRequest),
PluginDisabled(CodexPluginEventRequest),
}
#[derive(Serialize)]
@@ -233,6 +345,38 @@ struct CodexAppUsedEventRequest {
event_params: CodexAppMetadata,
}
#[derive(Serialize)]
struct CodexPluginMetadata {
plugin_id: Option<String>,
plugin_name: Option<String>,
marketplace_name: Option<String>,
has_skills: Option<bool>,
mcp_server_count: Option<usize>,
connector_ids: Option<Vec<String>>,
product_client_id: Option<String>,
}
#[derive(Serialize)]
struct CodexPluginUsedMetadata {
#[serde(flatten)]
plugin: CodexPluginMetadata,
thread_id: Option<String>,
turn_id: Option<String>,
model_slug: Option<String>,
}
#[derive(Serialize)]
struct CodexPluginEventRequest {
event_type: &'static str,
event_params: CodexPluginMetadata,
}
#[derive(Serialize)]
struct CodexPluginUsedEventRequest {
event_type: &'static str,
event_params: CodexPluginUsedMetadata,
}
pub(crate) fn track_skill_invocations(
queue: &AnalyticsEventsQueue,
config: Arc<Config>,
@@ -302,6 +446,48 @@ pub(crate) fn track_app_used(
queue.try_send(job);
}
pub(crate) fn track_plugin_used(
queue: &AnalyticsEventsQueue,
config: Arc<Config>,
tracking: Option<TrackEventsContext>,
plugin: PluginTelemetryMetadata,
) {
if config.analytics_enabled == Some(false) {
return;
}
let Some(tracking) = tracking else {
return;
};
if !queue.should_enqueue_plugin_used(&tracking, &plugin) {
return;
}
let job = TrackEventsJob::PluginUsed(TrackPluginUsedJob {
config,
tracking,
plugin,
});
queue.try_send(job);
}
fn track_plugin_management(
queue: &AnalyticsEventsQueue,
config: Arc<Config>,
event_type: PluginManagementEventType,
plugin: PluginTelemetryMetadata,
) {
if config.analytics_enabled == Some(false) {
return;
}
let job = TrackPluginManagementJob { config, plugin };
let job = match event_type {
PluginManagementEventType::Installed => TrackEventsJob::PluginInstalled(job),
PluginManagementEventType::Uninstalled => TrackEventsJob::PluginUninstalled(job),
PluginManagementEventType::Enabled => TrackEventsJob::PluginEnabled(job),
PluginManagementEventType::Disabled => TrackEventsJob::PluginDisabled(job),
};
queue.try_send(job);
}
async fn send_track_skill_invocations(auth_manager: &AuthManager, job: TrackSkillInvocationsJob) {
let TrackSkillInvocationsJob {
config,
@@ -385,6 +571,58 @@ async fn send_track_app_used(auth_manager: &AuthManager, job: TrackAppUsedJob) {
send_track_events(auth_manager, config, events).await;
}
async fn send_track_plugin_used(auth_manager: &AuthManager, job: TrackPluginUsedJob) {
let TrackPluginUsedJob {
config,
tracking,
plugin,
} = job;
let events = vec![TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest {
event_type: "codex_plugin_used",
event_params: codex_plugin_used_metadata(&tracking, plugin),
})];
send_track_events(auth_manager, config, events).await;
}
async fn send_track_plugin_installed(auth_manager: &AuthManager, job: TrackPluginManagementJob) {
send_track_plugin_management_event(auth_manager, job, "codex_plugin_installed").await;
}
async fn send_track_plugin_uninstalled(auth_manager: &AuthManager, job: TrackPluginManagementJob) {
send_track_plugin_management_event(auth_manager, job, "codex_plugin_uninstalled").await;
}
async fn send_track_plugin_enabled(auth_manager: &AuthManager, job: TrackPluginManagementJob) {
send_track_plugin_management_event(auth_manager, job, "codex_plugin_enabled").await;
}
async fn send_track_plugin_disabled(auth_manager: &AuthManager, job: TrackPluginManagementJob) {
send_track_plugin_management_event(auth_manager, job, "codex_plugin_disabled").await;
}
async fn send_track_plugin_management_event(
auth_manager: &AuthManager,
job: TrackPluginManagementJob,
event_type: &'static str,
) {
let TrackPluginManagementJob { config, plugin } = job;
let event_params = codex_plugin_metadata(plugin);
let event = CodexPluginEventRequest {
event_type,
event_params,
};
let events = vec![match event_type {
"codex_plugin_installed" => TrackEventRequest::PluginInstalled(event),
"codex_plugin_uninstalled" => TrackEventRequest::PluginUninstalled(event),
"codex_plugin_enabled" => TrackEventRequest::PluginEnabled(event),
"codex_plugin_disabled" => TrackEventRequest::PluginDisabled(event),
_ => unreachable!("unknown plugin management event type"),
}];
send_track_events(auth_manager, config, events).await;
}
fn codex_app_metadata(tracking: &TrackEventsContext, app: AppInvocation) -> CodexAppMetadata {
CodexAppMetadata {
connector_id: app.connector_id,
@@ -397,6 +635,30 @@ fn codex_app_metadata(tracking: &TrackEventsContext, app: AppInvocation) -> Code
}
}
fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPluginMetadata {
CodexPluginMetadata {
plugin_id: Some(plugin.plugin_id),
plugin_name: plugin.plugin_name,
marketplace_name: plugin.marketplace_name,
has_skills: plugin.has_skills,
mcp_server_count: plugin.mcp_server_count,
connector_ids: plugin.connector_ids,
product_client_id: Some(crate::default_client::originator().value),
}
}
fn codex_plugin_used_metadata(
tracking: &TrackEventsContext,
plugin: PluginTelemetryMetadata,
) -> CodexPluginUsedMetadata {
CodexPluginUsedMetadata {
plugin: codex_plugin_metadata(plugin),
thread_id: Some(tracking.thread_id.clone()),
turn_id: Some(tracking.turn_id.clone()),
model_slug: Some(tracking.model_slug.clone()),
}
}
async fn send_track_events(
auth_manager: &AuthManager,
config: Arc<Config>,

View File

@@ -2,11 +2,16 @@ use super::AnalyticsEventsQueue;
use super::AppInvocation;
use super::CodexAppMentionedEventRequest;
use super::CodexAppUsedEventRequest;
use super::CodexPluginEventRequest;
use super::CodexPluginUsedEventRequest;
use super::InvocationType;
use super::TrackEventRequest;
use super::TrackEventsContext;
use super::codex_app_metadata;
use super::codex_plugin_metadata;
use super::codex_plugin_used_metadata;
use super::normalize_path_for_skill_id;
use crate::plugins::PluginTelemetryMetadata;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::collections::HashSet;
@@ -153,6 +158,7 @@ fn app_used_dedupe_is_keyed_by_turn_and_connector() {
let queue = AnalyticsEventsQueue {
sender,
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
};
let app = AppInvocation {
connector_id: Some("calendar".to_string()),
@@ -175,3 +181,100 @@ fn app_used_dedupe_is_keyed_by_turn_and_connector() {
assert_eq!(queue.should_enqueue_app_used(&turn_1, &app), false);
assert_eq!(queue.should_enqueue_app_used(&turn_2, &app), true);
}
#[test]
fn plugin_used_event_serializes_expected_shape() {
let tracking = TrackEventsContext {
model_slug: "gpt-5".to_string(),
thread_id: "thread-3".to_string(),
turn_id: "turn-3".to_string(),
};
let event = TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest {
event_type: "codex_plugin_used",
event_params: codex_plugin_used_metadata(&tracking, sample_plugin_metadata()),
});
let payload = serde_json::to_value(&event).expect("serialize plugin used event");
assert_eq!(
payload,
json!({
"event_type": "codex_plugin_used",
"event_params": {
"plugin_id": "sample@test",
"plugin_name": "sample",
"marketplace_name": "test",
"has_skills": true,
"mcp_server_count": 2,
"connector_ids": ["calendar", "drive"],
"product_client_id": crate::default_client::originator().value,
"thread_id": "thread-3",
"turn_id": "turn-3",
"model_slug": "gpt-5"
}
})
);
}
#[test]
fn plugin_management_event_serializes_expected_shape() {
let event = TrackEventRequest::PluginInstalled(CodexPluginEventRequest {
event_type: "codex_plugin_installed",
event_params: codex_plugin_metadata(sample_plugin_metadata()),
});
let payload = serde_json::to_value(&event).expect("serialize plugin installed event");
assert_eq!(
payload,
json!({
"event_type": "codex_plugin_installed",
"event_params": {
"plugin_id": "sample@test",
"plugin_name": "sample",
"marketplace_name": "test",
"has_skills": true,
"mcp_server_count": 2,
"connector_ids": ["calendar", "drive"],
"product_client_id": crate::default_client::originator().value
}
})
);
}
#[test]
fn plugin_used_dedupe_is_keyed_by_turn_and_plugin() {
let (sender, _receiver) = mpsc::channel(1);
let queue = AnalyticsEventsQueue {
sender,
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
};
let plugin = sample_plugin_metadata();
let turn_1 = TrackEventsContext {
model_slug: "gpt-5".to_string(),
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
};
let turn_2 = TrackEventsContext {
model_slug: "gpt-5".to_string(),
thread_id: "thread-1".to_string(),
turn_id: "turn-2".to_string(),
};
assert_eq!(queue.should_enqueue_plugin_used(&turn_1, &plugin), true);
assert_eq!(queue.should_enqueue_plugin_used(&turn_1, &plugin), false);
assert_eq!(queue.should_enqueue_plugin_used(&turn_2, &plugin), true);
}
fn sample_plugin_metadata() -> PluginTelemetryMetadata {
PluginTelemetryMetadata {
plugin_id: "sample@test".to_string(),
plugin_name: Some("sample".to_string()),
marketplace_name: Some("test".to_string()),
has_skills: Some(true),
mcp_server_count: Some(2),
connector_ids: Some(vec!["calendar".to_string(), "drive".to_string()]),
}
}

View File

@@ -5482,6 +5482,10 @@ pub(crate) async fn run_turn(
let plugin_items =
build_plugin_injections(&mentioned_plugins, &mcp_tools, &available_connectors);
let mentioned_plugin_metadata = mentioned_plugins
.iter()
.map(crate::plugins::PluginCapabilitySummary::telemetry_metadata)
.collect::<Vec<_>>();
let mut explicitly_enabled_connectors = collect_explicit_app_ids(&input);
explicitly_enabled_connectors.extend(collect_explicit_app_ids_from_skill_items(
@@ -5520,6 +5524,11 @@ pub(crate) async fn run_turn(
sess.services
.analytics_events_client
.track_app_mentioned(tracking.clone(), mentioned_app_invocations);
for plugin in mentioned_plugin_metadata {
sess.services
.analytics_events_client
.track_plugin_used(tracking.clone(), plugin);
}
sess.merge_connector_selection(explicitly_enabled_connectors.clone())
.await;

View File

@@ -101,6 +101,7 @@ pub type NewConversation = NewThread;
#[deprecated(note = "use CodexThread")]
pub type CodexConversation = CodexThread;
// Re-export common auth types for workspace consumers
pub use analytics_client::AnalyticsEventsClient;
pub use auth::AuthManager;
pub use auth::CodexAuth;
pub mod default_client;

View File

@@ -19,6 +19,7 @@ use super::store::PluginInstallResult as StorePluginInstallResult;
use super::store::PluginStore;
use super::store::PluginStoreError;
use super::sync_openai_plugins_repo;
use crate::analytics_client::AnalyticsEventsClient;
use crate::auth::CodexAuth;
use crate::config::Config;
use crate::config::ConfigService;
@@ -160,6 +161,29 @@ pub struct PluginCapabilitySummary {
pub app_connector_ids: Vec<AppConnectorId>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PluginTelemetryMetadata {
pub plugin_id: String,
pub plugin_name: Option<String>,
pub marketplace_name: Option<String>,
pub has_skills: Option<bool>,
pub mcp_server_count: Option<usize>,
pub connector_ids: Option<Vec<String>>,
}
impl PluginTelemetryMetadata {
pub fn from_plugin_id(plugin_id: &PluginId) -> Self {
Self {
plugin_id: plugin_id.as_key(),
plugin_name: Some(plugin_id.plugin_name.clone()),
marketplace_name: Some(plugin_id.marketplace_name.clone()),
has_skills: None,
mcp_server_count: None,
connector_ids: None,
}
}
}
impl PluginCapabilitySummary {
fn from_plugin(plugin: &LoadedPlugin) -> Option<Self> {
if !plugin.is_active() {
@@ -186,6 +210,37 @@ impl PluginCapabilitySummary {
|| !summary.app_connector_ids.is_empty())
.then_some(summary)
}
pub fn telemetry_metadata(&self) -> PluginTelemetryMetadata {
match PluginId::parse(&self.config_name) {
Ok(plugin_id) => PluginTelemetryMetadata {
plugin_id: plugin_id.as_key(),
plugin_name: Some(plugin_id.plugin_name),
marketplace_name: Some(plugin_id.marketplace_name),
has_skills: Some(self.has_skills),
mcp_server_count: Some(self.mcp_server_names.len()),
connector_ids: Some(
self.app_connector_ids
.iter()
.map(|connector_id| connector_id.0.clone())
.collect(),
),
},
Err(_) => PluginTelemetryMetadata {
plugin_id: self.config_name.clone(),
plugin_name: Some(self.display_name.clone()),
marketplace_name: None,
has_skills: Some(self.has_skills),
mcp_server_count: Some(self.mcp_server_names.len()),
connector_ids: Some(
self.app_connector_ids
.iter()
.map(|connector_id| connector_id.0.clone())
.collect(),
),
},
}
}
}
#[derive(Debug, Clone, PartialEq)]
@@ -369,6 +424,7 @@ pub struct PluginsManager {
codex_home: PathBuf,
store: PluginStore,
cache_by_cwd: RwLock<HashMap<PathBuf, PluginLoadOutcome>>,
analytics_events_client: RwLock<Option<AnalyticsEventsClient>>,
}
impl PluginsManager {
@@ -377,6 +433,22 @@ impl PluginsManager {
codex_home: codex_home.clone(),
store: PluginStore::new(codex_home),
cache_by_cwd: RwLock::new(HashMap::new()),
analytics_events_client: RwLock::new(None),
}
}
pub fn set_analytics_events_client(&self, analytics_events_client: AnalyticsEventsClient) {
let mut stored_client = match self.analytics_events_client.write() {
Ok(client_guard) => client_guard,
Err(err) => err.into_inner(),
};
*stored_client = Some(analytics_events_client);
}
fn analytics_events_client(&self) -> Option<AnalyticsEventsClient> {
match self.analytics_events_client.read() {
Ok(client) => client.clone(),
Err(err) => err.into_inner().clone(),
}
}
@@ -466,6 +538,13 @@ impl PluginsManager {
.map(|_| ())
.map_err(PluginInstallError::from)?;
if let Some(analytics_events_client) = self.analytics_events_client() {
analytics_events_client.track_plugin_installed(plugin_telemetry_metadata_from_root(
&result.plugin_id,
result.installed_path.as_path(),
));
}
Ok(PluginInstallOutcome {
plugin_id: result.plugin_id,
plugin_version: result.plugin_version,
@@ -476,6 +555,10 @@ impl PluginsManager {
pub async fn uninstall_plugin(&self, plugin_id: String) -> Result<(), PluginUninstallError> {
let plugin_id = PluginId::parse(&plugin_id)?;
let plugin_telemetry = self
.store
.active_plugin_root(&plugin_id)
.map(|_| installed_plugin_telemetry_metadata(self.codex_home.as_path(), &plugin_id));
let store = self.store.clone();
let plugin_id_for_store = plugin_id.clone();
tokio::task::spawn_blocking(move || store.uninstall(&plugin_id_for_store))
@@ -489,6 +572,12 @@ impl PluginsManager {
.apply()
.await?;
if let Some(plugin_telemetry) = plugin_telemetry
&& let Some(analytics_events_client) = self.analytics_events_client()
{
analytics_events_client.track_plugin_uninstalled(plugin_telemetry);
}
Ok(())
}
@@ -1360,6 +1449,52 @@ fn load_apps_from_paths(
connector_ids
}
pub fn plugin_telemetry_metadata_from_root(
plugin_id: &PluginId,
plugin_root: &Path,
) -> PluginTelemetryMetadata {
let Some(manifest) = load_plugin_manifest(plugin_root) else {
return PluginTelemetryMetadata::from_plugin_id(plugin_id);
};
let manifest_paths = plugin_manifest_paths(&manifest, plugin_root);
let has_skills = !plugin_skill_roots(plugin_root, &manifest_paths).is_empty();
let mcp_server_count = manifest_paths
.mcp_servers
.as_ref()
.map(|path| {
load_mcp_servers_from_file(plugin_root, path)
.mcp_servers
.len()
})
.unwrap_or(0);
let connector_ids = load_plugin_apps(plugin_root)
.into_iter()
.map(|connector_id| connector_id.0)
.collect();
PluginTelemetryMetadata {
plugin_id: plugin_id.as_key(),
plugin_name: Some(plugin_id.plugin_name.clone()),
marketplace_name: Some(plugin_id.marketplace_name.clone()),
has_skills: Some(has_skills),
mcp_server_count: Some(mcp_server_count),
connector_ids: Some(connector_ids),
}
}
pub fn installed_plugin_telemetry_metadata(
codex_home: &Path,
plugin_id: &PluginId,
) -> PluginTelemetryMetadata {
let store = PluginStore::new(codex_home.to_path_buf());
let Some(plugin_root) = store.active_plugin_root(plugin_id) else {
return PluginTelemetryMetadata::from_plugin_id(plugin_id);
};
plugin_telemetry_metadata_from_root(plugin_id, plugin_root.as_path())
}
fn load_mcp_servers_from_file(
plugin_root: &Path,
mcp_config_path: &AbsolutePathBuf,

View File

@@ -23,11 +23,14 @@ pub use manager::PluginLoadOutcome;
pub use manager::PluginReadOutcome;
pub use manager::PluginReadRequest;
pub use manager::PluginRemoteSyncError;
pub use manager::PluginTelemetryMetadata;
pub use manager::PluginUninstallError;
pub use manager::PluginsManager;
pub use manager::RemotePluginSyncResult;
pub use manager::installed_plugin_telemetry_metadata;
pub use manager::load_plugin_apps;
pub(crate) use manager::plugin_namespace_for_skill_path;
pub use manager::plugin_telemetry_metadata_from_root;
pub use manifest::PluginManifestInterfaceSummary;
pub(crate) use manifest::PluginManifestPaths;
pub(crate) use manifest::load_plugin_manifest;

View File

@@ -107,6 +107,24 @@ async fn build_plugin_test_codex(
.codex)
}
async fn build_analytics_plugin_test_codex(
server: &MockServer,
codex_home: Arc<TempDir>,
) -> Result<Arc<codex_core::CodexThread>> {
let chatgpt_base_url = server.uri();
let mut builder = test_codex()
.with_home(codex_home)
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(move |config| {
config.chatgpt_base_url = chatgpt_base_url;
});
Ok(builder
.build(server)
.await
.expect("create new conversation")
.codex)
}
async fn build_apps_enabled_plugin_test_codex(
server: &MockServer,
codex_home: Arc<TempDir>,
@@ -299,6 +317,70 @@ async fn explicit_plugin_mentions_inject_plugin_guidance() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn explicit_plugin_mentions_track_plugin_used_analytics() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let _resp_mock = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let codex_home = Arc::new(TempDir::new()?);
write_plugin_skill_plugin(codex_home.as_ref());
let codex = build_analytics_plugin_test_codex(&server, codex_home).await?;
codex
.submit(Op::UserInput {
items: vec![codex_protocol::user_input::UserInput::Mention {
name: "sample".into(),
path: format!("plugin://{SAMPLE_PLUGIN_CONFIG_NAME}"),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let deadline = Instant::now() + Duration::from_secs(10);
let analytics_request = loop {
let requests = server.received_requests().await.unwrap_or_default();
if let Some(request) = requests
.into_iter()
.find(|request| request.url.path() == "/codex/analytics-events/events")
{
break request;
}
if Instant::now() >= deadline {
panic!("timed out waiting for plugin analytics request");
}
tokio::time::sleep(Duration::from_millis(50)).await;
};
let payload: serde_json::Value =
serde_json::from_slice(&analytics_request.body).expect("analytics payload");
let event = &payload["events"][0];
assert_eq!(event["event_type"], "codex_plugin_used");
assert_eq!(event["event_params"]["plugin_id"], "sample@test");
assert_eq!(event["event_params"]["plugin_name"], "sample");
assert_eq!(event["event_params"]["marketplace_name"], "test");
assert_eq!(event["event_params"]["has_skills"], true);
assert_eq!(event["event_params"]["mcp_server_count"], 0);
assert_eq!(
event["event_params"]["connector_ids"],
serde_json::json!([])
);
assert_eq!(
event["event_params"]["product_client_id"],
serde_json::json!(codex_core::default_client::originator().value)
);
assert_eq!(event["event_params"]["model_slug"], "gpt-5");
assert!(event["event_params"]["thread_id"].as_str().is_some());
assert!(event["event_params"]["turn_id"].as_str().is_some());
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn plugin_mcp_tools_are_listed() -> Result<()> {
skip_if_no_network!(Ok(()));