Simplify compaction analytics reducer emission

This commit is contained in:
Roy Han
2026-04-08 15:24:55 -07:00
parent 6636449efa
commit 61122ffaac
2 changed files with 22 additions and 54 deletions

View File

@@ -521,31 +521,30 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
}
#[tokio::test]
async fn compaction_event_waits_for_thread_connection_metadata() {
async fn compaction_event_uses_existing_thread_connection_metadata() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
let input = || CodexCompactionEvent {
thread_id: "thread-1".to_string(),
turn_id: "turn-compact".to_string(),
trigger: CompactionTrigger::Manual,
mode: CompactionMode::Local,
status: CompactionStatus::Failed,
error: Some("context limit exceeded".to_string()),
started_at: 100,
completed_at: 101,
duration_ms: Some(1200),
};
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
CodexCompactionEvent {
thread_id: "thread-1".to_string(),
turn_id: "turn-compact".to_string(),
trigger: CompactionTrigger::Manual,
mode: CompactionMode::Local,
status: CompactionStatus::Failed,
error: Some("context limit exceeded".to_string()),
started_at: 100,
completed_at: 101,
duration_ms: Some(1200),
},
))),
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(input()))),
&mut events,
)
.await;
assert!(
events.is_empty(),
"compaction events should wait for client/runtime metadata"
"compaction events need existing client/runtime metadata"
);
reducer
@@ -587,6 +586,13 @@ async fn compaction_event_waits_for_thread_connection_metadata() {
)
.await;
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(input()))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 2);
assert_eq!(payload[0]["event_type"], "codex_thread_initialized");

View File

@@ -44,7 +44,6 @@ use std::path::Path;
pub(crate) struct AnalyticsReducer {
connections: HashMap<u64, ConnectionState>,
thread_connections: HashMap<String, u64>,
pending_compactions: HashMap<String, Vec<CodexCompactionEvent>>,
}
struct ConnectionState {
@@ -285,45 +284,9 @@ impl AnalyticsReducer {
},
},
));
self.maybe_emit_pending_compactions(&thread_id, out);
}
fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec<TrackEventRequest>) {
let thread_id = input.thread_id.clone();
if self.try_emit_compaction(input.clone(), out) {
return;
}
self.pending_compactions
.entry(thread_id)
.or_default()
.push(input);
}
fn maybe_emit_pending_compactions(
&mut self,
thread_id: &str,
out: &mut Vec<TrackEventRequest>,
) {
let Some(pending) = self.pending_compactions.remove(thread_id) else {
return;
};
let mut still_pending = Vec::new();
for input in pending {
if !self.try_emit_compaction(input.clone(), out) {
still_pending.push(input);
}
}
if !still_pending.is_empty() {
self.pending_compactions
.insert(thread_id.to_string(), still_pending);
}
}
fn try_emit_compaction(
&mut self,
input: CodexCompactionEvent,
out: &mut Vec<TrackEventRequest>,
) -> bool {
let connection_metadata = self
.thread_connections
.get(&input.thread_id)
@@ -335,7 +298,7 @@ impl AnalyticsReducer {
)
});
let Some((app_server_client, runtime)) = connection_metadata else {
return false;
return;
};
out.push(TrackEventRequest::Compaction(Box::new(
CodexCompactionEventRequest {
@@ -343,7 +306,6 @@ impl AnalyticsReducer {
event_params: codex_compaction_event_params(app_server_client, runtime, input),
},
)));
true
}
}