[codex-analytics] centralize thread analytics state

This commit is contained in:
rhan-oai
2026-04-29 12:02:25 -07:00
parent 27f9236e22
commit b39f92b4cf

View File

@@ -74,8 +74,7 @@ pub(crate) struct AnalyticsReducer {
requests: HashMap<(u64, RequestId), RequestState>,
turns: HashMap<String, TurnState>,
connections: HashMap<u64, ConnectionState>,
thread_connections: HashMap<String, u64>,
thread_metadata: HashMap<String, ThreadMetadataState>,
threads: HashMap<String, ThreadAnalyticsState>,
}
struct ConnectionState {
@@ -83,6 +82,84 @@ struct ConnectionState {
runtime: CodexRuntimeMetadata,
}
#[derive(Default)]
struct ThreadAnalyticsState {
connection_id: Option<u64>,
metadata: Option<ThreadMetadataState>,
}
struct ResolvedAnalyticsContext<'a> {
connection_state: &'a ConnectionState,
thread_metadata: Option<&'a ThreadMetadataState>,
}
enum AnalyticsConnectionSource<'a> {
Thread(&'a str),
Known(u64),
MaybeKnown(Option<u64>),
}
enum ThreadMetadataRequirement<'a> {
NotRequired,
Required(&'a str),
}
struct AnalyticsDropSite<'a> {
event_name: &'static str,
thread_id: &'a str,
turn_id: Option<&'a str>,
review_id: Option<&'a str>,
item_id: Option<&'a str>,
}
impl<'a> AnalyticsDropSite<'a> {
fn guardian(input: &'a GuardianReviewEventParams) -> Self {
Self {
event_name: "guardian",
thread_id: &input.thread_id,
turn_id: Some(&input.turn_id),
review_id: Some(&input.review_id),
item_id: None,
}
}
fn compaction(input: &'a CodexCompactionEvent) -> Self {
Self {
event_name: "compaction",
thread_id: &input.thread_id,
turn_id: Some(&input.turn_id),
review_id: None,
item_id: None,
}
}
fn turn_steer(thread_id: &'a str) -> Self {
Self {
event_name: "turn steer",
thread_id,
turn_id: None,
review_id: None,
item_id: None,
}
}
fn turn(thread_id: &'a str, turn_id: &'a str) -> Self {
Self {
event_name: "turn",
thread_id,
turn_id: Some(turn_id),
review_id: None,
item_id: None,
}
}
}
enum MissingAnalyticsContext {
ThreadConnection,
Connection { connection_id: u64 },
ThreadMetadata,
}
#[derive(Clone)]
struct ThreadMetadataState {
thread_source: Option<&'static str>,
@@ -271,6 +348,22 @@ impl AnalyticsReducer {
input: SubAgentThreadStartedInput,
out: &mut Vec<TrackEventRequest>,
) {
let parent_thread_id = input
.parent_thread_id
.clone()
.or_else(|| subagent_parent_thread_id(&input.subagent_source));
let parent_connection_id = parent_thread_id
.as_ref()
.and_then(|parent_thread_id| self.threads.get(parent_thread_id))
.and_then(|thread| thread.connection_id);
let thread_state = self.threads.entry(input.thread_id.clone()).or_default();
thread_state.metadata = Some(ThreadMetadataState {
thread_source: Some("subagent"),
initialization_mode: ThreadInitializationMode::New,
subagent_source: Some(subagent_source_name(&input.subagent_source)),
parent_thread_id,
});
thread_state.connection_id = parent_connection_id;
out.push(TrackEventRequest::ThreadInitialized(
subagent_thread_started_event_request(input),
));
@@ -281,23 +374,15 @@ impl AnalyticsReducer {
input: GuardianReviewEventParams,
out: &mut Vec<TrackEventRequest>,
) {
let Some(connection_id) = self.thread_connections.get(&input.thread_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
review_id = %input.review_id,
"dropping guardian analytics event: missing thread connection metadata"
);
return;
};
let Some(connection_state) = self.connections.get(connection_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
review_id = %input.review_id,
connection_id,
"dropping guardian analytics event: missing connection metadata"
);
let Some(ResolvedAnalyticsContext {
connection_state,
thread_metadata: None,
}) = self.resolve_analytics_context(
AnalyticsDropSite::guardian(&input),
AnalyticsConnectionSource::Thread(&input.thread_id),
ThreadMetadataRequirement::NotRequired,
)
else {
return;
};
out.push(TrackEventRequest::GuardianReview(Box::new(
@@ -683,10 +768,13 @@ impl AnalyticsReducer {
};
let thread_metadata =
ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode);
self.thread_connections
.insert(thread_id.clone(), connection_id);
self.thread_metadata
.insert(thread_id.clone(), thread_metadata.clone());
self.threads.insert(
thread_id.clone(),
ThreadAnalyticsState {
connection_id: Some(connection_id),
metadata: Some(thread_metadata.clone()),
},
);
out.push(TrackEventRequest::ThreadInitialized(
ThreadInitializedEvent {
event_type: "codex_thread_initialized",
@@ -707,29 +795,15 @@ impl AnalyticsReducer {
}
fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec<TrackEventRequest>) {
let Some(connection_id) = self.thread_connections.get(&input.thread_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
"dropping compaction analytics event: missing thread connection metadata"
);
return;
};
let Some(connection_state) = self.connections.get(connection_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
connection_id,
"dropping compaction analytics event: missing connection metadata"
);
return;
};
let Some(thread_metadata) = self.thread_metadata.get(&input.thread_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
"dropping compaction analytics event: missing thread lifecycle metadata"
);
let Some(ResolvedAnalyticsContext {
connection_state,
thread_metadata: Some(thread_metadata),
}) = self.resolve_analytics_context(
AnalyticsDropSite::compaction(&input),
AnalyticsConnectionSource::Thread(&input.thread_id),
ThreadMetadataRequirement::Required(&input.thread_id),
)
else {
return;
};
out.push(TrackEventRequest::Compaction(Box::new(
@@ -781,14 +855,15 @@ impl AnalyticsReducer {
rejection_reason: Option<TurnSteerRejectionReason>,
out: &mut Vec<TrackEventRequest>,
) {
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
};
let Some(thread_metadata) = self.thread_metadata.get(&pending_request.thread_id) else {
tracing::warn!(
thread_id = %pending_request.thread_id,
"dropping turn steer analytics event: missing thread lifecycle metadata"
);
let Some(ResolvedAnalyticsContext {
connection_state,
thread_metadata: Some(thread_metadata),
}) = self.resolve_analytics_context(
AnalyticsDropSite::turn_steer(&pending_request.thread_id),
AnalyticsConnectionSource::Known(connection_id),
ThreadMetadataRequirement::Required(&pending_request.thread_id),
)
else {
return;
};
out.push(TrackEventRequest::TurnSteer(CodexTurnSteerEventRequest {
@@ -821,42 +896,26 @@ impl AnalyticsReducer {
{
return;
}
let connection_metadata = turn_state
.connection_id
.and_then(|connection_id| self.connections.get(&connection_id))
.map(|connection_state| {
(
connection_state.app_server_client.clone(),
connection_state.runtime.clone(),
)
});
let Some((app_server_client, runtime)) = connection_metadata else {
if let Some(connection_id) = turn_state.connection_id {
tracing::warn!(
turn_id,
connection_id,
"dropping turn analytics event: missing connection metadata"
);
}
return;
};
let Some(thread_id) = turn_state.thread_id.as_ref() else {
return;
};
let Some(thread_metadata) = self.thread_metadata.get(thread_id) else {
tracing::warn!(
thread_id,
turn_id,
"dropping turn analytics event: missing thread lifecycle metadata"
);
let Some(ResolvedAnalyticsContext {
connection_state,
thread_metadata: Some(thread_metadata),
}) = self.resolve_analytics_context(
AnalyticsDropSite::turn(thread_id, turn_id),
AnalyticsConnectionSource::MaybeKnown(turn_state.connection_id),
ThreadMetadataRequirement::Required(thread_id),
)
else {
return;
};
out.push(TrackEventRequest::TurnEvent(Box::new(
CodexTurnEventRequest {
event_type: "codex_turn_event",
event_params: codex_turn_event_params(
app_server_client,
runtime,
connection_state.app_server_client.clone(),
connection_state.runtime.clone(),
turn_id.to_string(),
turn_state,
thread_metadata,
@@ -865,6 +924,105 @@ impl AnalyticsReducer {
)));
self.turns.remove(turn_id);
}
fn resolve_analytics_context(
&self,
drop_site: AnalyticsDropSite<'_>,
connection_source: AnalyticsConnectionSource<'_>,
thread_metadata_requirement: ThreadMetadataRequirement<'_>,
) -> Option<ResolvedAnalyticsContext<'_>> {
let connection_id = match connection_source {
AnalyticsConnectionSource::Thread(thread_id) => {
let Some(thread_state) = self.threads.get(thread_id) else {
warn_missing_analytics_context(
&drop_site,
MissingAnalyticsContext::ThreadConnection,
);
return None;
};
let Some(connection_id) = thread_state.connection_id else {
warn_missing_analytics_context(
&drop_site,
MissingAnalyticsContext::ThreadConnection,
);
return None;
};
connection_id
}
AnalyticsConnectionSource::Known(connection_id) => connection_id,
AnalyticsConnectionSource::MaybeKnown(None) => return None,
AnalyticsConnectionSource::MaybeKnown(Some(connection_id)) => connection_id,
};
let Some(connection_state) = self.connections.get(&connection_id) else {
warn_missing_analytics_context(
&drop_site,
MissingAnalyticsContext::Connection { connection_id },
);
return None;
};
let thread_metadata = match thread_metadata_requirement {
ThreadMetadataRequirement::NotRequired => None,
ThreadMetadataRequirement::Required(thread_id) => {
let Some(thread_metadata) = self.thread_metadata(thread_id) else {
warn_missing_analytics_context(
&drop_site,
MissingAnalyticsContext::ThreadMetadata,
);
return None;
};
Some(thread_metadata)
}
};
Some(ResolvedAnalyticsContext {
connection_state,
thread_metadata,
})
}
fn thread_metadata(&self, thread_id: &str) -> Option<&ThreadMetadataState> {
self.threads
.get(thread_id)
.and_then(|thread| thread.metadata.as_ref())
}
}
fn warn_missing_analytics_context(
drop_site: &AnalyticsDropSite<'_>,
missing: MissingAnalyticsContext,
) {
match missing {
MissingAnalyticsContext::ThreadConnection => {
tracing::warn!(
thread_id = %drop_site.thread_id,
turn_id = ?drop_site.turn_id,
review_id = ?drop_site.review_id,
item_id = ?drop_site.item_id,
"dropping {} analytics event: missing thread connection metadata",
drop_site.event_name
);
}
MissingAnalyticsContext::Connection { connection_id } => {
tracing::warn!(
thread_id = %drop_site.thread_id,
turn_id = ?drop_site.turn_id,
review_id = ?drop_site.review_id,
item_id = ?drop_site.item_id,
connection_id,
"dropping {} analytics event: missing connection metadata",
drop_site.event_name
);
}
MissingAnalyticsContext::ThreadMetadata => {
tracing::warn!(
thread_id = %drop_site.thread_id,
turn_id = ?drop_site.turn_id,
review_id = ?drop_site.review_id,
item_id = ?drop_site.item_id,
"dropping {} analytics event: missing thread lifecycle metadata",
drop_site.event_name
);
}
}
}
fn codex_turn_event_params(