Files
codex/codex-rs/analytics/src/reducer.rs
rhan-oai e8de4ea953 [codex-analytics] thread events (#15690)
- add event for thread initialization
- thread/start, thread/fork, thread/resume
- feature flagged behind `FeatureFlag::GeneralAnalytics`
- does not yet support threads started by subagents

PR stack:
- --> [[telemetry] thread events
#15690](https://github.com/openai/codex/pull/15690)
- [[telemetry] subagent events
#15915](https://github.com/openai/codex/pull/15915)
- [[telemetry] turn events
#15591](https://github.com/openai/codex/pull/15591)
- [[telemetry] steer events
#15697](https://github.com/openai/codex/pull/15697)
- [[telemetry] queued prompt data
#15804](https://github.com/openai/codex/pull/15804)


Sample extracted logs in Codex-backend
```
INFO     | 2026-03-29 16:39:37 | codex_backend.routers.analytics_events | analytics_events.track_analytics_events:398 | Tracked analytics event codex_thread_initialized thread_id=019d3bf7-9f5f-7f82-9877-6d48d1052531 product_surface=codex product_client_id=CODEX_CLI client_name=codex-tui client_version=0.0.0 rpc_transport=in_process experimental_api_enabled=True codex_rs_version=0.0.0 runtime_os=macos runtime_os_version=26.4.0 runtime_arch=aarch64 model=gpt-5.3-codex ephemeral=False thread_source=user initialization_mode=new subagent_source=None parent_thread_id=None created_at=1774827577 | 
INFO     | 2026-03-29 16:45:46 | codex_backend.routers.analytics_events | analytics_events.track_analytics_events:398 | Tracked analytics event codex_thread_initialized thread_id=019d3b84-5731-79d0-9b3b-9c6efe5f5066 product_surface=codex product_client_id=CODEX_CLI client_name=codex-tui client_version=0.0.0 rpc_transport=in_process experimental_api_enabled=True codex_rs_version=0.0.0 runtime_os=macos runtime_os_version=26.4.0 runtime_arch=aarch64 model=gpt-5.3-codex ephemeral=False thread_source=user initialization_mode=resumed subagent_source=None parent_thread_id=None created_at=1774820022 | 
INFO     | 2026-03-29 16:45:49 | codex_backend.routers.analytics_events | analytics_events.track_analytics_events:398 | Tracked analytics event codex_thread_initialized thread_id=019d3bfd-4cd6-7c12-a13e-48cef02e8c4d product_surface=codex product_client_id=CODEX_CLI client_name=codex-tui client_version=0.0.0 rpc_transport=in_process experimental_api_enabled=True codex_rs_version=0.0.0 runtime_os=macos runtime_os_version=26.4.0 runtime_arch=aarch64 model=gpt-5.3-codex ephemeral=False thread_source=user initialization_mode=forked subagent_source=None parent_thread_id=None created_at=1774827949 | 
INFO     | 2026-03-29 17:20:29 | codex_backend.routers.analytics_events | analytics_events.track_analytics_events:398 | Tracked analytics event codex_thread_initialized thread_id=019d3c1d-0412-7ed2-ad24-c9c0881a36b0 product_surface=codex product_client_id=CODEX_SERVICE_EXEC client_name=codex_exec client_version=0.0.0 rpc_transport=in_process experimental_api_enabled=True codex_rs_version=0.0.0 runtime_os=macos runtime_os_version=26.4.0 runtime_arch=aarch64 model=gpt-5.3-codex ephemeral=False thread_source=user initialization_mode=new subagent_source=None parent_thread_id=None created_at=1774830027 | 
```

Notes
- `product_client_id` gets canonicalized in codex-backend
- subagent threads are addressed in a following pr
2026-03-31 12:16:44 -07:00

306 lines
11 KiB
Rust

use crate::events::AppServerRpcTransport;
use crate::events::CodexAppMentionedEventRequest;
use crate::events::CodexAppServerClientMetadata;
use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexRuntimeMetadata;
use crate::events::SkillInvocationEventParams;
use crate::events::SkillInvocationEventRequest;
use crate::events::ThreadInitializationMode;
use crate::events::ThreadInitializedEvent;
use crate::events::ThreadInitializedEventParams;
use crate::events::TrackEventRequest;
use crate::events::codex_app_metadata;
use crate::events::codex_plugin_metadata;
use crate::events::codex_plugin_used_metadata;
use crate::events::plugin_state_event_type;
use crate::events::thread_source_name;
use crate::facts::AnalyticsFact;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
use crate::facts::CustomAnalyticsFact;
use crate::facts::PluginState;
use crate::facts::PluginStateChangedInput;
use crate::facts::PluginUsedInput;
use crate::facts::SkillInvokedInput;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::InitializeParams;
use codex_git_utils::collect_git_info;
use codex_git_utils::get_git_repo_root;
use codex_login::default_client::originator;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SkillScope;
use sha1::Digest;
use std::collections::HashMap;
use std::path::Path;
#[derive(Default)]
pub(crate) struct AnalyticsReducer {
connections: HashMap<u64, ConnectionState>,
}
struct ConnectionState {
app_server_client: CodexAppServerClientMetadata,
runtime: CodexRuntimeMetadata,
}
impl AnalyticsReducer {
pub(crate) async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec<TrackEventRequest>) {
match input {
AnalyticsFact::Initialize {
connection_id,
params,
product_client_id,
runtime,
rpc_transport,
} => {
self.ingest_initialize(
connection_id,
params,
product_client_id,
runtime,
rpc_transport,
);
}
AnalyticsFact::Request {
connection_id: _connection_id,
request_id: _request_id,
request: _request,
} => {}
AnalyticsFact::Response {
connection_id,
response,
} => {
self.ingest_response(connection_id, *response, out);
}
AnalyticsFact::Notification(_notification) => {}
AnalyticsFact::Custom(input) => match input {
CustomAnalyticsFact::SkillInvoked(input) => {
self.ingest_skill_invoked(input, out).await;
}
CustomAnalyticsFact::AppMentioned(input) => {
self.ingest_app_mentioned(input, out);
}
CustomAnalyticsFact::AppUsed(input) => {
self.ingest_app_used(input, out);
}
CustomAnalyticsFact::PluginUsed(input) => {
self.ingest_plugin_used(input, out);
}
CustomAnalyticsFact::PluginStateChanged(input) => {
self.ingest_plugin_state_changed(input, out);
}
},
}
}
fn ingest_initialize(
&mut self,
connection_id: u64,
params: InitializeParams,
product_client_id: String,
runtime: CodexRuntimeMetadata,
rpc_transport: AppServerRpcTransport,
) {
self.connections.insert(
connection_id,
ConnectionState {
app_server_client: CodexAppServerClientMetadata {
product_client_id,
client_name: Some(params.client_info.name),
client_version: Some(params.client_info.version),
rpc_transport,
experimental_api_enabled: params
.capabilities
.map(|capabilities| capabilities.experimental_api),
},
runtime,
},
);
}
async fn ingest_skill_invoked(
&mut self,
input: SkillInvokedInput,
out: &mut Vec<TrackEventRequest>,
) {
let SkillInvokedInput {
tracking,
invocations,
} = input;
for invocation in invocations {
let skill_scope = match invocation.skill_scope {
SkillScope::User => "user",
SkillScope::Repo => "repo",
SkillScope::System => "system",
SkillScope::Admin => "admin",
};
let repo_root = get_git_repo_root(invocation.skill_path.as_path());
let repo_url = if let Some(root) = repo_root.as_ref() {
collect_git_info(root)
.await
.and_then(|info| info.repository_url)
} else {
None
};
let skill_id = skill_id_for_local_skill(
repo_url.as_deref(),
repo_root.as_deref(),
invocation.skill_path.as_path(),
invocation.skill_name.as_str(),
);
out.push(TrackEventRequest::SkillInvocation(
SkillInvocationEventRequest {
event_type: "skill_invocation",
skill_id,
skill_name: invocation.skill_name.clone(),
event_params: SkillInvocationEventParams {
thread_id: Some(tracking.thread_id.clone()),
invoke_type: Some(invocation.invocation_type),
model_slug: Some(tracking.model_slug.clone()),
product_client_id: Some(originator().value),
repo_url,
skill_scope: Some(skill_scope.to_string()),
},
},
));
}
}
fn ingest_app_mentioned(&mut self, input: AppMentionedInput, out: &mut Vec<TrackEventRequest>) {
let AppMentionedInput { tracking, mentions } = input;
out.extend(mentions.into_iter().map(|mention| {
let event_params = codex_app_metadata(&tracking, mention);
TrackEventRequest::AppMentioned(CodexAppMentionedEventRequest {
event_type: "codex_app_mentioned",
event_params,
})
}));
}
fn ingest_app_used(&mut self, input: AppUsedInput, out: &mut Vec<TrackEventRequest>) {
let AppUsedInput { tracking, app } = input;
let event_params = codex_app_metadata(&tracking, app);
out.push(TrackEventRequest::AppUsed(CodexAppUsedEventRequest {
event_type: "codex_app_used",
event_params,
}));
}
fn ingest_plugin_used(&mut self, input: PluginUsedInput, out: &mut Vec<TrackEventRequest>) {
let PluginUsedInput { tracking, plugin } = input;
out.push(TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest {
event_type: "codex_plugin_used",
event_params: codex_plugin_used_metadata(&tracking, plugin),
}));
}
fn ingest_plugin_state_changed(
&mut self,
input: PluginStateChangedInput,
out: &mut Vec<TrackEventRequest>,
) {
let PluginStateChangedInput { plugin, state } = input;
let event = CodexPluginEventRequest {
event_type: plugin_state_event_type(state),
event_params: codex_plugin_metadata(plugin),
};
out.push(match state {
PluginState::Installed => TrackEventRequest::PluginInstalled(event),
PluginState::Uninstalled => TrackEventRequest::PluginUninstalled(event),
PluginState::Enabled => TrackEventRequest::PluginEnabled(event),
PluginState::Disabled => TrackEventRequest::PluginDisabled(event),
});
}
fn ingest_response(
&mut self,
connection_id: u64,
response: ClientResponse,
out: &mut Vec<TrackEventRequest>,
) {
let (thread, model, initialization_mode) = match response {
ClientResponse::ThreadStart { response, .. } => (
response.thread,
response.model,
ThreadInitializationMode::New,
),
ClientResponse::ThreadResume { response, .. } => (
response.thread,
response.model,
ThreadInitializationMode::Resumed,
),
ClientResponse::ThreadFork { response, .. } => (
response.thread,
response.model,
ThreadInitializationMode::Forked,
),
_ => return,
};
let thread_source: SessionSource = thread.source.into();
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
};
out.push(TrackEventRequest::ThreadInitialized(
ThreadInitializedEvent {
event_type: "codex_thread_initialized",
event_params: ThreadInitializedEventParams {
thread_id: thread.id,
app_server_client: connection_state.app_server_client.clone(),
runtime: connection_state.runtime.clone(),
model,
ephemeral: thread.ephemeral,
thread_source: thread_source_name(&thread_source),
initialization_mode,
subagent_source: None,
parent_thread_id: None,
created_at: u64::try_from(thread.created_at).unwrap_or_default(),
},
},
));
}
}
pub(crate) fn skill_id_for_local_skill(
repo_url: Option<&str>,
repo_root: Option<&Path>,
skill_path: &Path,
skill_name: &str,
) -> String {
let path = normalize_path_for_skill_id(repo_url, repo_root, skill_path);
let prefix = if let Some(url) = repo_url {
format!("repo_{url}")
} else {
"personal".to_string()
};
let raw_id = format!("{prefix}_{path}_{skill_name}");
let mut hasher = sha1::Sha1::new();
sha1::Digest::update(&mut hasher, raw_id.as_bytes());
format!("{:x}", sha1::Digest::finalize(hasher))
}
/// Returns a normalized path for skill ID construction.
///
/// - Repo-scoped skills use a path relative to the repo root.
/// - User/admin/system skills use an absolute path.
pub(crate) fn normalize_path_for_skill_id(
repo_url: Option<&str>,
repo_root: Option<&Path>,
skill_path: &Path,
) -> String {
let resolved_path =
std::fs::canonicalize(skill_path).unwrap_or_else(|_| skill_path.to_path_buf());
match (repo_url, repo_root) {
(Some(_), Some(root)) => {
let resolved_root = std::fs::canonicalize(root).unwrap_or_else(|_| root.to_path_buf());
resolved_path
.strip_prefix(&resolved_root)
.unwrap_or(resolved_path.as_path())
.to_string_lossy()
.replace('\\', "/")
}
_ => resolved_path.to_string_lossy().replace('\\', "/"),
}
}