diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index 33e89df011..e9fd03f924 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -74,8 +74,7 @@ pub(crate) struct AnalyticsReducer { requests: HashMap<(u64, RequestId), RequestState>, turns: HashMap, connections: HashMap, - thread_connections: HashMap, - thread_metadata: HashMap, + threads: HashMap, } struct ConnectionState { @@ -83,6 +82,84 @@ struct ConnectionState { runtime: CodexRuntimeMetadata, } +#[derive(Default)] +struct ThreadAnalyticsState { + connection_id: Option, + metadata: Option, +} + +struct ResolvedAnalyticsContext<'a> { + connection_state: &'a ConnectionState, + thread_metadata: Option<&'a ThreadMetadataState>, +} + +enum AnalyticsConnectionSource<'a> { + Thread(&'a str), + Known(u64), + MaybeKnown(Option), +} + +enum ThreadMetadataRequirement<'a> { + NotRequired, + Required(&'a str), +} + +struct AnalyticsDropSite<'a> { + event_name: &'static str, + thread_id: &'a str, + turn_id: Option<&'a str>, + review_id: Option<&'a str>, + item_id: Option<&'a str>, +} + +impl<'a> AnalyticsDropSite<'a> { + fn guardian(input: &'a GuardianReviewEventParams) -> Self { + Self { + event_name: "guardian", + thread_id: &input.thread_id, + turn_id: Some(&input.turn_id), + review_id: Some(&input.review_id), + item_id: None, + } + } + + fn compaction(input: &'a CodexCompactionEvent) -> Self { + Self { + event_name: "compaction", + thread_id: &input.thread_id, + turn_id: Some(&input.turn_id), + review_id: None, + item_id: None, + } + } + + fn turn_steer(thread_id: &'a str) -> Self { + Self { + event_name: "turn steer", + thread_id, + turn_id: None, + review_id: None, + item_id: None, + } + } + + fn turn(thread_id: &'a str, turn_id: &'a str) -> Self { + Self { + event_name: "turn", + thread_id, + turn_id: Some(turn_id), + review_id: None, + item_id: None, + } + } +} + +enum MissingAnalyticsContext { + ThreadConnection, + Connection { connection_id: u64 }, + ThreadMetadata, +} + #[derive(Clone)] struct ThreadMetadataState { thread_source: Option<&'static str>, @@ -271,6 +348,22 @@ impl AnalyticsReducer { input: SubAgentThreadStartedInput, out: &mut Vec, ) { + let parent_thread_id = input + .parent_thread_id + .clone() + .or_else(|| subagent_parent_thread_id(&input.subagent_source)); + let parent_connection_id = parent_thread_id + .as_ref() + .and_then(|parent_thread_id| self.threads.get(parent_thread_id)) + .and_then(|thread| thread.connection_id); + let thread_state = self.threads.entry(input.thread_id.clone()).or_default(); + thread_state.metadata = Some(ThreadMetadataState { + thread_source: Some("subagent"), + initialization_mode: ThreadInitializationMode::New, + subagent_source: Some(subagent_source_name(&input.subagent_source)), + parent_thread_id, + }); + thread_state.connection_id = parent_connection_id; out.push(TrackEventRequest::ThreadInitialized( subagent_thread_started_event_request(input), )); @@ -281,23 +374,15 @@ impl AnalyticsReducer { input: GuardianReviewEventParams, out: &mut Vec, ) { - let Some(connection_id) = self.thread_connections.get(&input.thread_id) else { - tracing::warn!( - thread_id = %input.thread_id, - turn_id = %input.turn_id, - review_id = %input.review_id, - "dropping guardian analytics event: missing thread connection metadata" - ); - return; - }; - let Some(connection_state) = self.connections.get(connection_id) else { - tracing::warn!( - thread_id = %input.thread_id, - turn_id = %input.turn_id, - review_id = %input.review_id, - connection_id, - "dropping guardian analytics event: missing connection metadata" - ); + let Some(ResolvedAnalyticsContext { + connection_state, + thread_metadata: None, + }) = self.resolve_analytics_context( + AnalyticsDropSite::guardian(&input), + AnalyticsConnectionSource::Thread(&input.thread_id), + ThreadMetadataRequirement::NotRequired, + ) + else { return; }; out.push(TrackEventRequest::GuardianReview(Box::new( @@ -683,10 +768,13 @@ impl AnalyticsReducer { }; let thread_metadata = ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode); - self.thread_connections - .insert(thread_id.clone(), connection_id); - self.thread_metadata - .insert(thread_id.clone(), thread_metadata.clone()); + self.threads.insert( + thread_id.clone(), + ThreadAnalyticsState { + connection_id: Some(connection_id), + metadata: Some(thread_metadata.clone()), + }, + ); out.push(TrackEventRequest::ThreadInitialized( ThreadInitializedEvent { event_type: "codex_thread_initialized", @@ -707,29 +795,15 @@ impl AnalyticsReducer { } fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec) { - let Some(connection_id) = self.thread_connections.get(&input.thread_id) else { - tracing::warn!( - thread_id = %input.thread_id, - turn_id = %input.turn_id, - "dropping compaction analytics event: missing thread connection metadata" - ); - return; - }; - let Some(connection_state) = self.connections.get(connection_id) else { - tracing::warn!( - thread_id = %input.thread_id, - turn_id = %input.turn_id, - connection_id, - "dropping compaction analytics event: missing connection metadata" - ); - return; - }; - let Some(thread_metadata) = self.thread_metadata.get(&input.thread_id) else { - tracing::warn!( - thread_id = %input.thread_id, - turn_id = %input.turn_id, - "dropping compaction analytics event: missing thread lifecycle metadata" - ); + let Some(ResolvedAnalyticsContext { + connection_state, + thread_metadata: Some(thread_metadata), + }) = self.resolve_analytics_context( + AnalyticsDropSite::compaction(&input), + AnalyticsConnectionSource::Thread(&input.thread_id), + ThreadMetadataRequirement::Required(&input.thread_id), + ) + else { return; }; out.push(TrackEventRequest::Compaction(Box::new( @@ -781,14 +855,15 @@ impl AnalyticsReducer { rejection_reason: Option, out: &mut Vec, ) { - let Some(connection_state) = self.connections.get(&connection_id) else { - return; - }; - let Some(thread_metadata) = self.thread_metadata.get(&pending_request.thread_id) else { - tracing::warn!( - thread_id = %pending_request.thread_id, - "dropping turn steer analytics event: missing thread lifecycle metadata" - ); + let Some(ResolvedAnalyticsContext { + connection_state, + thread_metadata: Some(thread_metadata), + }) = self.resolve_analytics_context( + AnalyticsDropSite::turn_steer(&pending_request.thread_id), + AnalyticsConnectionSource::Known(connection_id), + ThreadMetadataRequirement::Required(&pending_request.thread_id), + ) + else { return; }; out.push(TrackEventRequest::TurnSteer(CodexTurnSteerEventRequest { @@ -821,42 +896,26 @@ impl AnalyticsReducer { { return; } - let connection_metadata = turn_state - .connection_id - .and_then(|connection_id| self.connections.get(&connection_id)) - .map(|connection_state| { - ( - connection_state.app_server_client.clone(), - connection_state.runtime.clone(), - ) - }); - let Some((app_server_client, runtime)) = connection_metadata else { - if let Some(connection_id) = turn_state.connection_id { - tracing::warn!( - turn_id, - connection_id, - "dropping turn analytics event: missing connection metadata" - ); - } - return; - }; let Some(thread_id) = turn_state.thread_id.as_ref() else { return; }; - let Some(thread_metadata) = self.thread_metadata.get(thread_id) else { - tracing::warn!( - thread_id, - turn_id, - "dropping turn analytics event: missing thread lifecycle metadata" - ); + let Some(ResolvedAnalyticsContext { + connection_state, + thread_metadata: Some(thread_metadata), + }) = self.resolve_analytics_context( + AnalyticsDropSite::turn(thread_id, turn_id), + AnalyticsConnectionSource::MaybeKnown(turn_state.connection_id), + ThreadMetadataRequirement::Required(thread_id), + ) + else { return; }; out.push(TrackEventRequest::TurnEvent(Box::new( CodexTurnEventRequest { event_type: "codex_turn_event", event_params: codex_turn_event_params( - app_server_client, - runtime, + connection_state.app_server_client.clone(), + connection_state.runtime.clone(), turn_id.to_string(), turn_state, thread_metadata, @@ -865,6 +924,105 @@ impl AnalyticsReducer { ))); self.turns.remove(turn_id); } + + fn resolve_analytics_context( + &self, + drop_site: AnalyticsDropSite<'_>, + connection_source: AnalyticsConnectionSource<'_>, + thread_metadata_requirement: ThreadMetadataRequirement<'_>, + ) -> Option> { + let connection_id = match connection_source { + AnalyticsConnectionSource::Thread(thread_id) => { + let Some(thread_state) = self.threads.get(thread_id) else { + warn_missing_analytics_context( + &drop_site, + MissingAnalyticsContext::ThreadConnection, + ); + return None; + }; + let Some(connection_id) = thread_state.connection_id else { + warn_missing_analytics_context( + &drop_site, + MissingAnalyticsContext::ThreadConnection, + ); + return None; + }; + connection_id + } + AnalyticsConnectionSource::Known(connection_id) => connection_id, + AnalyticsConnectionSource::MaybeKnown(None) => return None, + AnalyticsConnectionSource::MaybeKnown(Some(connection_id)) => connection_id, + }; + let Some(connection_state) = self.connections.get(&connection_id) else { + warn_missing_analytics_context( + &drop_site, + MissingAnalyticsContext::Connection { connection_id }, + ); + return None; + }; + let thread_metadata = match thread_metadata_requirement { + ThreadMetadataRequirement::NotRequired => None, + ThreadMetadataRequirement::Required(thread_id) => { + let Some(thread_metadata) = self.thread_metadata(thread_id) else { + warn_missing_analytics_context( + &drop_site, + MissingAnalyticsContext::ThreadMetadata, + ); + return None; + }; + Some(thread_metadata) + } + }; + Some(ResolvedAnalyticsContext { + connection_state, + thread_metadata, + }) + } + + fn thread_metadata(&self, thread_id: &str) -> Option<&ThreadMetadataState> { + self.threads + .get(thread_id) + .and_then(|thread| thread.metadata.as_ref()) + } +} + +fn warn_missing_analytics_context( + drop_site: &AnalyticsDropSite<'_>, + missing: MissingAnalyticsContext, +) { + match missing { + MissingAnalyticsContext::ThreadConnection => { + tracing::warn!( + thread_id = %drop_site.thread_id, + turn_id = ?drop_site.turn_id, + review_id = ?drop_site.review_id, + item_id = ?drop_site.item_id, + "dropping {} analytics event: missing thread connection metadata", + drop_site.event_name + ); + } + MissingAnalyticsContext::Connection { connection_id } => { + tracing::warn!( + thread_id = %drop_site.thread_id, + turn_id = ?drop_site.turn_id, + review_id = ?drop_site.review_id, + item_id = ?drop_site.item_id, + connection_id, + "dropping {} analytics event: missing connection metadata", + drop_site.event_name + ); + } + MissingAnalyticsContext::ThreadMetadata => { + tracing::warn!( + thread_id = %drop_site.thread_id, + turn_id = ?drop_site.turn_id, + review_id = ?drop_site.review_id, + item_id = ?drop_site.item_id, + "dropping {} analytics event: missing thread lifecycle metadata", + drop_site.event_name + ); + } + } } fn codex_turn_event_params(