[codex-analytics] add compaction analytics event

This commit is contained in:
rhan-oai
2026-04-08 13:10:12 -07:00
committed by Roy Han
parent 598d6ff056
commit d9757c306e
9 changed files with 565 additions and 10 deletions

View File

@@ -2,6 +2,7 @@ use crate::events::AppServerRpcTransport;
use crate::events::CodexAppMentionedEventRequest;
use crate::events::CodexAppServerClientMetadata;
use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexCompactionEventRequest;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexRuntimeMetadata;
@@ -12,6 +13,7 @@ use crate::events::ThreadInitializedEvent;
use crate::events::ThreadInitializedEventParams;
use crate::events::TrackEventRequest;
use crate::events::codex_app_metadata;
use crate::events::codex_compaction_event_params;
use crate::events::codex_plugin_metadata;
use crate::events::codex_plugin_used_metadata;
use crate::events::plugin_state_event_type;
@@ -20,6 +22,7 @@ use crate::events::thread_source_name;
use crate::facts::AnalyticsFact;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
use crate::facts::CodexCompactionEvent;
use crate::facts::CustomAnalyticsFact;
use crate::facts::PluginState;
use crate::facts::PluginStateChangedInput;
@@ -40,6 +43,8 @@ use std::path::Path;
#[derive(Default)]
pub(crate) struct AnalyticsReducer {
connections: HashMap<u64, ConnectionState>,
thread_connections: HashMap<String, u64>,
pending_compactions: HashMap<String, Vec<CodexCompactionEvent>>,
}
struct ConnectionState {
@@ -81,6 +86,9 @@ impl AnalyticsReducer {
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
self.ingest_subagent_thread_started(input, out);
}
CustomAnalyticsFact::Compaction(input) => {
self.ingest_compaction(*input, out);
}
CustomAnalyticsFact::SkillInvoked(input) => {
self.ingest_skill_invoked(input, out).await;
}
@@ -254,6 +262,9 @@ impl AnalyticsReducer {
_ => return,
};
let thread_source: SessionSource = thread.source.into();
let thread_id = thread.id;
self.thread_connections
.insert(thread_id.clone(), connection_id);
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
};
@@ -261,7 +272,7 @@ impl AnalyticsReducer {
ThreadInitializedEvent {
event_type: "codex_thread_initialized",
event_params: ThreadInitializedEventParams {
thread_id: thread.id,
thread_id: thread_id.clone(),
app_server_client: connection_state.app_server_client.clone(),
runtime: connection_state.runtime.clone(),
model,
@@ -274,6 +285,65 @@ impl AnalyticsReducer {
},
},
));
self.maybe_emit_pending_compactions(&thread_id, out);
}
fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec<TrackEventRequest>) {
let thread_id = input.thread_id.clone();
if self.try_emit_compaction(input.clone(), out) {
return;
}
self.pending_compactions
.entry(thread_id)
.or_default()
.push(input);
}
fn maybe_emit_pending_compactions(
&mut self,
thread_id: &str,
out: &mut Vec<TrackEventRequest>,
) {
let Some(pending) = self.pending_compactions.remove(thread_id) else {
return;
};
let mut still_pending = Vec::new();
for input in pending {
if !self.try_emit_compaction(input.clone(), out) {
still_pending.push(input);
}
}
if !still_pending.is_empty() {
self.pending_compactions
.insert(thread_id.to_string(), still_pending);
}
}
fn try_emit_compaction(
&mut self,
input: CodexCompactionEvent,
out: &mut Vec<TrackEventRequest>,
) -> bool {
let connection_metadata = self
.thread_connections
.get(&input.thread_id)
.and_then(|connection_id| self.connections.get(connection_id))
.map(|connection_state| {
(
connection_state.app_server_client.clone(),
connection_state.runtime.clone(),
)
});
let Some((app_server_client, runtime)) = connection_metadata else {
return false;
};
out.push(TrackEventRequest::Compaction(Box::new(
CodexCompactionEventRequest {
event_type: "codex_compaction_event",
event_params: codex_compaction_event_params(app_server_client, runtime, input),
},
)));
true
}
}