[codex-analytics] add grouped session id to runtime events (#24655)

## Why
- Runtime analytics events report `thread_id`, which identifies the
individual thread emitting an event
- They don't report `session_id`, which identifies the shared session
for a root thread and its subagent threads
- Emitting both identifiers allows analytics to group related activity

## What Changed
- Adds `session_id` to relevant analytics events (thread_initalized,
turn, turn_steer, compaction, guardian_review)
- Tracks each thread's session ID in the analytics reducer so subsequent
thread scoped events emit the same value
- Carries the shared session ID through subagent initialization

## Verification
- `just test -p codex-analytics` validates event payloads and subagent
session grouping.
- Focused `codex-app-server` tests validate session IDs for thread,
turn, and steer events.
- Focused `codex-core` tests validate root and subagent session ID
propagation.
This commit is contained in:
marksteinbrick-oai
2026-05-26 16:38:46 -07:00
committed by GitHub
parent dc4e54d061
commit 487521733b
13 changed files with 87 additions and 5 deletions

View File

@@ -1213,6 +1213,7 @@ fn compaction_event_serializes_expected_shape() {
completed_at: 106,
duration_ms: Some(6543),
},
"session-thread-1".to_string(),
sample_app_server_client_metadata(),
sample_runtime_metadata(),
Some(ThreadSource::User),
@@ -1229,6 +1230,7 @@ fn compaction_event_serializes_expected_shape() {
"event_type": "codex_compaction_event",
"event_params": {
"thread_id": "thread-1",
"session_id": "session-thread-1",
"turn_id": "turn-1",
"app_server_client": {
"product_client_id": DEFAULT_ORIGINATOR,
@@ -1307,6 +1309,7 @@ fn thread_initialized_event_serializes_expected_shape() {
event_type: "codex_thread_initialized",
event_params: ThreadInitializedEventParams {
thread_id: "thread-0".to_string(),
session_id: "session-thread-0".to_string(),
app_server_client: CodexAppServerClientMetadata {
product_client_id: DEFAULT_ORIGINATOR.to_string(),
client_name: Some("codex-tui".to_string()),
@@ -1338,6 +1341,7 @@ fn thread_initialized_event_serializes_expected_shape() {
"event_type": "codex_thread_initialized",
"event_params": {
"thread_id": "thread-0",
"session_id": "session-thread-0",
"app_server_client": {
"product_client_id": DEFAULT_ORIGINATOR,
"client_name": "codex-tui",
@@ -1605,6 +1609,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 1);
assert_eq!(payload[0]["event_type"], "codex_thread_initialized");
assert_eq!(payload[0]["event_params"]["session_id"], "session-thread-1");
assert_eq!(
payload[0]["event_params"]["app_server_client"]["product_client_id"],
DEFAULT_ORIGINATOR
@@ -1781,6 +1786,7 @@ async fn compaction_event_ingests_custom_fact() {
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 1);
assert_eq!(payload[0]["event_type"], "codex_compaction_event");
assert_eq!(payload[0]["event_params"]["session_id"], "session-thread-1");
assert_eq!(payload[0]["event_params"]["thread_id"], "thread-1");
assert_eq!(payload[0]["event_params"]["turn_id"], "turn-compact");
assert_eq!(
@@ -1905,6 +1911,10 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() {
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 1);
assert_eq!(payload[0]["event_type"], "codex_guardian_review");
assert_eq!(
payload[0]["event_params"]["session_id"],
"session-thread-guardian"
);
assert_eq!(payload[0]["event_params"]["thread_id"], "thread-guardian");
assert_eq!(payload[0]["event_params"]["turn_id"], "turn-guardian");
assert_eq!(payload[0]["event_params"]["review_id"], "review-guardian");
@@ -2397,6 +2407,7 @@ async fn item_review_summaries_do_not_cross_threads_with_reused_item_ids() {
fn subagent_thread_started_review_serializes_expected_shape() {
let event = TrackEventRequest::ThreadInitialized(subagent_thread_started_event_request(
SubAgentThreadStartedInput {
session_id: "session-root".to_string(),
thread_id: "thread-review".to_string(),
parent_thread_id: None,
product_client_id: "codex-tui".to_string(),
@@ -2440,6 +2451,7 @@ fn subagent_thread_started_thread_spawn_serializes_parent_thread_id() {
.expect("valid thread id");
let event = TrackEventRequest::ThreadInitialized(subagent_thread_started_event_request(
SubAgentThreadStartedInput {
session_id: "session-root".to_string(),
thread_id: "thread-spawn".to_string(),
parent_thread_id: None,
product_client_id: "codex-tui".to_string(),
@@ -2459,18 +2471,21 @@ fn subagent_thread_started_thread_spawn_serializes_parent_thread_id() {
));
let payload = serde_json::to_value(&event).expect("serialize thread spawn subagent event");
assert_eq!(payload["event_params"]["thread_id"], "thread-spawn");
assert_eq!(payload["event_params"]["thread_source"], "subagent");
assert_eq!(payload["event_params"]["subagent_source"], "thread_spawn");
assert_eq!(
payload["event_params"]["parent_thread_id"],
"11111111-1111-1111-1111-111111111111"
);
assert_eq!(payload["event_params"]["session_id"], "session-root");
}
#[test]
fn subagent_thread_started_memory_consolidation_serializes_expected_shape() {
let event = TrackEventRequest::ThreadInitialized(subagent_thread_started_event_request(
SubAgentThreadStartedInput {
session_id: "session-root".to_string(),
thread_id: "thread-memory".to_string(),
parent_thread_id: None,
product_client_id: "codex-tui".to_string(),
@@ -2496,6 +2511,7 @@ fn subagent_thread_started_memory_consolidation_serializes_expected_shape() {
fn subagent_thread_started_other_serializes_expected_shape() {
let event = TrackEventRequest::ThreadInitialized(subagent_thread_started_event_request(
SubAgentThreadStartedInput {
session_id: "session-root".to_string(),
thread_id: "thread-guardian".to_string(),
parent_thread_id: None,
product_client_id: "codex-tui".to_string(),
@@ -2517,6 +2533,7 @@ fn subagent_thread_started_other_serializes_expected_shape() {
fn subagent_thread_started_other_serializes_explicit_parent_thread_id() {
let event = TrackEventRequest::ThreadInitialized(subagent_thread_started_event_request(
SubAgentThreadStartedInput {
session_id: "session-root".to_string(),
thread_id: "thread-guardian".to_string(),
parent_thread_id: Some("parent-thread-guardian".to_string()),
product_client_id: "codex-tui".to_string(),
@@ -2546,6 +2563,7 @@ async fn subagent_thread_started_publishes_without_initialize() {
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted(
SubAgentThreadStartedInput {
session_id: "session-root".to_string(),
thread_id: "thread-review".to_string(),
parent_thread_id: None,
product_client_id: "codex-tui".to_string(),
@@ -2619,6 +2637,7 @@ async fn subagent_thread_started_inherits_parent_connection_for_new_thread() {
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted(
SubAgentThreadStartedInput {
session_id: "session-root".to_string(),
thread_id: "thread-review".to_string(),
parent_thread_id: None,
product_client_id: "parent-client".to_string(),
@@ -2666,6 +2685,8 @@ async fn subagent_thread_started_inherits_parent_connection_for_new_thread() {
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload[0]["event_params"]["session_id"], "session-root");
assert_eq!(payload[0]["event_params"]["thread_id"], "thread-review");
assert_eq!(
payload[0]["event_params"]["app_server_client"]["product_client_id"],
"parent-client"
@@ -2686,6 +2707,7 @@ async fn subagent_tool_items_inherit_parent_connection_metadata() {
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted(
SubAgentThreadStartedInput {
session_id: "session-root".to_string(),
thread_id: "thread-subagent".to_string(),
parent_thread_id: Some("thread-1".to_string()),
product_client_id: "codex-tui".to_string(),
@@ -3191,6 +3213,7 @@ fn turn_event_serializes_expected_shape() {
event_type: "codex_turn_event",
event_params: crate::events::CodexTurnEventParams {
thread_id: "thread-2".to_string(),
session_id: "session-thread-2".to_string(),
turn_id: "turn-2".to_string(),
app_server_client: sample_app_server_client_metadata(),
runtime: sample_runtime_metadata(),
@@ -3241,6 +3264,7 @@ fn turn_event_serializes_expected_shape() {
"event_type": "codex_turn_event",
"event_params": {
"thread_id": "thread-2",
"session_id": "session-thread-2",
"turn_id": "turn-2",
"submission_type": null,
"app_server_client": {
@@ -3342,6 +3366,10 @@ async fn accepted_turn_steer_emits_expected_event() {
let payload = serde_json::to_value(&out[0]).expect("serialize turn steer event");
assert_eq!(payload["event_type"], json!("codex_turn_steer_event"));
assert_eq!(payload["event_params"]["thread_id"], json!("thread-2"));
assert_eq!(
payload["event_params"]["session_id"],
json!("session-thread-2")
);
assert_eq!(payload["event_params"]["expected_turn_id"], json!("turn-2"));
assert_eq!(payload["event_params"]["accepted_turn_id"], json!("turn-2"));
assert_eq!(payload["event_params"]["num_input_images"], json!(1));
@@ -3559,6 +3587,10 @@ async fn turn_lifecycle_emits_turn_event() {
let payload = serde_json::to_value(&out[0]).expect("serialize turn event");
assert_eq!(payload["event_type"], json!("codex_turn_event"));
assert_eq!(payload["event_params"]["thread_id"], json!("thread-2"));
assert_eq!(
payload["event_params"]["session_id"],
json!("session-thread-2")
);
assert_eq!(payload["event_params"]["turn_id"], json!("turn-2"));
assert_eq!(
payload["event_params"]["app_server_client"],

View File

@@ -147,6 +147,7 @@ pub(crate) struct CodexRuntimeMetadata {
#[derive(Serialize)]
pub(crate) struct ThreadInitializedEventParams {
pub(crate) thread_id: String,
pub(crate) session_id: String,
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) model: String,
@@ -420,6 +421,7 @@ impl GuardianReviewAnalyticsResult {
#[derive(Serialize)]
pub(crate) struct GuardianReviewEventPayload {
pub(crate) session_id: String,
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
#[serde(flatten)]
@@ -738,6 +740,7 @@ pub(crate) struct CodexHookRunEventRequest {
#[derive(Serialize)]
pub(crate) struct CodexCompactionEventParams {
pub(crate) thread_id: String,
pub(crate) session_id: String,
pub(crate) turn_id: String,
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
@@ -767,6 +770,7 @@ pub(crate) struct CodexCompactionEventRequest {
#[derive(Serialize)]
pub(crate) struct CodexTurnEventParams {
pub(crate) thread_id: String,
pub(crate) session_id: String,
pub(crate) turn_id: String,
// TODO(rhan-oai): Populate once queued/default submission type is plumbed from
// the turn/start callsites instead of always being reported as None.
@@ -821,6 +825,7 @@ pub(crate) struct CodexTurnEventRequest {
#[derive(Serialize)]
pub(crate) struct CodexTurnSteerEventParams {
pub(crate) thread_id: String,
pub(crate) session_id: String,
pub(crate) expected_turn_id: Option<String>,
pub(crate) accepted_turn_id: Option<String>,
pub(crate) app_server_client: CodexAppServerClientMetadata,
@@ -926,6 +931,7 @@ pub(crate) fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPlu
pub(crate) fn codex_compaction_event_params(
input: CodexCompactionEvent,
session_id: String,
app_server_client: CodexAppServerClientMetadata,
runtime: CodexRuntimeMetadata,
thread_source: Option<ThreadSource>,
@@ -934,6 +940,7 @@ pub(crate) fn codex_compaction_event_params(
) -> CodexCompactionEventParams {
CodexCompactionEventParams {
thread_id: input.thread_id,
session_id,
turn_id: input.turn_id,
app_server_client,
runtime,
@@ -1026,6 +1033,7 @@ pub(crate) fn subagent_thread_started_event_request(
) -> ThreadInitializedEvent {
let event_params = ThreadInitializedEventParams {
thread_id: input.thread_id,
session_id: input.session_id,
app_server_client: CodexAppServerClientMetadata {
product_client_id: input.product_client_id,
client_name: Some(input.client_name),

View File

@@ -199,6 +199,7 @@ pub struct AppInvocation {
#[derive(Clone)]
pub struct SubAgentThreadStartedInput {
pub session_id: String,
pub thread_id: String,
pub parent_thread_id: Option<String>,
pub product_client_id: String,

View File

@@ -255,6 +255,7 @@ struct ItemReviewSummary {
#[derive(Clone)]
struct ThreadMetadataState {
session_id: String,
thread_source: Option<ThreadSource>,
initialization_mode: ThreadInitializationMode,
subagent_source: Option<String>,
@@ -263,6 +264,7 @@ struct ThreadMetadataState {
impl ThreadMetadataState {
fn from_thread_metadata(
session_id: String,
session_source: &SessionSource,
thread_source: Option<ThreadSource>,
initialization_mode: ThreadInitializationMode,
@@ -281,6 +283,7 @@ impl ThreadMetadataState {
| SessionSource::Unknown => (None, None),
};
Self {
session_id,
thread_source,
initialization_mode,
subagent_source,
@@ -525,6 +528,7 @@ impl AnalyticsReducer {
thread_state
.metadata
.get_or_insert_with(|| ThreadMetadataState {
session_id: input.session_id.clone(),
thread_source: Some(ThreadSource::Subagent),
initialization_mode: ThreadInitializationMode::New,
subagent_source: Some(subagent_source_name(&input.subagent_source)),
@@ -543,8 +547,8 @@ impl AnalyticsReducer {
input: GuardianReviewEventParams,
out: &mut Vec<TrackEventRequest>,
) {
let Some(connection_state) =
self.thread_connection_or_warn(AnalyticsDropSite::guardian(&input))
let Some((connection_state, thread_metadata)) =
self.thread_context_or_warn(AnalyticsDropSite::guardian(&input))
else {
return;
};
@@ -552,6 +556,7 @@ impl AnalyticsReducer {
GuardianReviewEventRequest {
event_type: "codex_guardian_review",
event_params: GuardianReviewEventPayload {
session_id: thread_metadata.session_id.clone(),
app_server_client: connection_state.app_server_client.clone(),
runtime: connection_state.runtime.clone(),
guardian_review: input,
@@ -1231,11 +1236,13 @@ impl AnalyticsReducer {
out: &mut Vec<TrackEventRequest>,
) {
let session_source: SessionSource = thread.source.into();
let session_id = thread.session_id;
let thread_id = thread.id;
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
};
let thread_metadata = ThreadMetadataState::from_thread_metadata(
session_id.clone(),
&session_source,
thread.thread_source.map(Into::into),
initialization_mode,
@@ -1252,6 +1259,7 @@ impl AnalyticsReducer {
event_type: "codex_thread_initialized",
event_params: ThreadInitializedEventParams {
thread_id,
session_id,
app_server_client: connection_state.app_server_client.clone(),
runtime: connection_state.runtime.clone(),
model,
@@ -1277,6 +1285,7 @@ impl AnalyticsReducer {
event_type: "codex_compaction_event",
event_params: codex_compaction_event_params(
input,
thread_metadata.session_id.clone(),
connection_state.app_server_client.clone(),
connection_state.runtime.clone(),
thread_metadata.thread_source,
@@ -1379,6 +1388,7 @@ impl AnalyticsReducer {
event_type: "codex_turn_steer_event",
event_params: CodexTurnSteerEventParams {
thread_id: pending_request.thread_id,
session_id: thread_metadata.session_id.clone(),
expected_turn_id: Some(pending_request.expected_turn_id),
accepted_turn_id,
app_server_client: connection_state.app_server_client.clone(),
@@ -2447,6 +2457,7 @@ fn codex_turn_event_params(
let token_usage = turn_state.token_usage.clone();
CodexTurnEventParams {
thread_id,
session_id: thread_metadata.session_id.clone(),
turn_id,
app_server_client,
runtime,