[codex-analytics] rework thread_source for thread analytics (#20949)

## Summary
- make `thread_source` an explicit optional thread-level field on
`thread/start`, `thread/fork`, and returned thread payloads
- persist `thread_source` in rollout/session metadata so resumed live
threads retain the original value
- replace the old best-effort `session_source` -> `thread_source`
mapping with an explicit caller-supplied analytics classification

## Why
Before this change, analytics `thread_source` was populated by a
best-effort mapping from `session_source`. `session_source` describes
the runtime/client surface, not the actual thread-level origin, so that
projection was not accurate enough to distinguish cases such as `user`,
`subagent`, `memory_consolidation`, and future thread origins reliably.

Making `thread_source` explicit keeps one thread-level analytics field
while letting callers provide the real classification directly instead
of recovering it indirectly from `session_source`.

## Impact
For new analytics events, `thread_source` now reflects the explicit
thread-level classification supplied by the caller rather than an
inferred value derived from `session_source`. Existing protocol fields
remain optional; callers that omit `threadSource` now produce `null`
instead of a best-effort inferred value.

## Validation
- `just write-app-server-schema`
- `cargo test -p codex-analytics -p codex-core -p
codex-app-server-protocol --no-run`
- `cargo test -p codex-app-server-protocol
generated_ts_optional_nullable_fields_only_in_params`
- `cargo test -p codex-analytics
thread_initialized_event_serializes_expected_shape`
- `cargo test -p codex-core
resume_stopped_thread_from_rollout_preserves_thread_source`
This commit is contained in:
rhan-oai
2026-05-05 19:12:31 -07:00
committed by GitHub
parent 94db03d5af
commit b3d4f1a9f0
98 changed files with 896 additions and 90 deletions

View File

@@ -79,6 +79,7 @@ use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadSource as AppServerThreadSource;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
use codex_app_server_protocol::Turn;
@@ -107,6 +108,7 @@ use codex_protocol::protocol::HookSource;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::ThreadSource;
use codex_protocol::protocol::TokenUsage;
use codex_utils_absolute_path::test_support::PathBufExt;
use codex_utils_absolute_path::test_support::test_path_buf;
@@ -118,14 +120,11 @@ use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::mpsc;
fn sample_thread(thread_id: &str, ephemeral: bool) -> Thread {
sample_thread_with_source(thread_id, ephemeral, AppServerSessionSource::Exec)
}
fn sample_thread_with_source(
fn sample_thread_with_metadata(
thread_id: &str,
ephemeral: bool,
source: AppServerSessionSource,
thread_source: Option<AppServerThreadSource>,
) -> Thread {
Thread {
id: thread_id.to_string(),
@@ -140,6 +139,7 @@ fn sample_thread_with_source(
cwd: test_path_buf("/tmp").abs(),
cli_version: "0.0.0".to_string(),
source,
thread_source,
agent_nickname: None,
agent_role: None,
git_info: None,
@@ -154,7 +154,12 @@ fn sample_thread_start_response(
model: &str,
) -> ClientResponsePayload {
ClientResponsePayload::ThreadStart(ThreadStartResponse {
thread: sample_thread(thread_id, ephemeral),
thread: sample_thread_with_metadata(
thread_id,
ephemeral,
AppServerSessionSource::Exec,
Some(AppServerThreadSource::User),
),
model: model.to_string(),
model_provider: "openai".to_string(),
service_tier: None,
@@ -198,6 +203,7 @@ fn sample_thread_resume_response(
ephemeral,
model,
AppServerSessionSource::Exec,
Some(AppServerThreadSource::User),
)
}
@@ -206,9 +212,10 @@ fn sample_thread_resume_response_with_source(
ephemeral: bool,
model: &str,
source: AppServerSessionSource,
thread_source: Option<AppServerThreadSource>,
) -> ClientResponsePayload {
ClientResponsePayload::ThreadResume(ThreadResumeResponse {
thread: sample_thread_with_source(thread_id, ephemeral, source),
thread: sample_thread_with_metadata(thread_id, ephemeral, source, thread_source),
model: model.to_string(),
model_provider: "openai".to_string(),
service_tier: None,
@@ -753,7 +760,7 @@ fn compaction_event_serializes_expected_shape() {
},
sample_app_server_client_metadata(),
sample_runtime_metadata(),
Some("user"),
Some(ThreadSource::User),
/*subagent_source*/ None,
/*parent_thread_id*/ None,
),
@@ -852,7 +859,7 @@ fn thread_initialized_event_serializes_expected_shape() {
},
model: "gpt-5".to_string(),
ephemeral: true,
thread_source: Some("user"),
thread_source: Some(ThreadSource::User),
initialization_mode: ThreadInitializationMode::New,
subagent_source: None,
parent_thread_id: None,
@@ -1196,6 +1203,7 @@ async fn compaction_event_ingests_custom_fact() {
agent_nickname: None,
agent_role: None,
}),
Some(AppServerThreadSource::Subagent),
)),
},
&mut events,
@@ -2116,7 +2124,7 @@ fn turn_event_serializes_expected_shape() {
runtime: sample_runtime_metadata(),
submission_type: None,
ephemeral: false,
thread_source: Some("user".to_string()),
thread_source: Some(ThreadSource::User),
initialization_mode: ThreadInitializationMode::New,
subagent_source: None,
parent_thread_id: None,

View File

@@ -87,6 +87,7 @@ fn sample_thread(thread_id: &str) -> Thread {
cwd: test_path_buf("/tmp").abs(),
cli_version: "0.0.0".to_string(),
source: AppServerSessionSource::Exec,
thread_source: None,
agent_nickname: None,
agent_role: None,
git_info: None,

View File

@@ -33,6 +33,7 @@ use codex_protocol::protocol::HookEventName;
use codex_protocol::protocol::HookRunStatus;
use codex_protocol::protocol::HookSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::ThreadSource;
use codex_protocol::protocol::TokenUsage;
use serde::Serialize;
@@ -126,7 +127,7 @@ pub(crate) struct ThreadInitializedEventParams {
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) model: String,
pub(crate) ephemeral: bool,
pub(crate) thread_source: Option<&'static str>,
pub(crate) thread_source: Option<ThreadSource>,
pub(crate) initialization_mode: ThreadInitializationMode,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
@@ -647,7 +648,7 @@ pub(crate) struct CodexCompactionEventParams {
pub(crate) turn_id: String,
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) thread_source: Option<&'static str>,
pub(crate) thread_source: Option<ThreadSource>,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) trigger: CompactionTrigger,
@@ -680,7 +681,7 @@ pub(crate) struct CodexTurnEventParams {
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) ephemeral: bool,
pub(crate) thread_source: Option<String>,
pub(crate) thread_source: Option<ThreadSource>,
pub(crate) initialization_mode: ThreadInitializationMode,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
@@ -733,7 +734,7 @@ pub(crate) struct CodexTurnSteerEventParams {
pub(crate) accepted_turn_id: Option<String>,
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) thread_source: Option<String>,
pub(crate) thread_source: Option<ThreadSource>,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) num_input_images: usize,
@@ -836,7 +837,7 @@ pub(crate) fn codex_compaction_event_params(
input: CodexCompactionEvent,
app_server_client: CodexAppServerClientMetadata,
runtime: CodexRuntimeMetadata,
thread_source: Option<&'static str>,
thread_source: Option<ThreadSource>,
subagent_source: Option<String>,
parent_thread_id: Option<String>,
) -> CodexCompactionEventParams {
@@ -940,7 +941,7 @@ pub(crate) fn subagent_thread_started_event_request(
runtime: current_runtime_metadata(),
model: input.model,
ephemeral: input.ephemeral,
thread_source: Some("subagent"),
thread_source: Some(ThreadSource::Subagent),
initialization_mode: ThreadInitializationMode::New,
subagent_source: Some(subagent_source_name(&input.subagent_source)),
parent_thread_id: input

View File

@@ -64,6 +64,7 @@ use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SkillScope;
use codex_protocol::protocol::ThreadSource;
use codex_protocol::protocol::TokenUsage;
use sha1::Digest;
use std::collections::HashMap;
@@ -147,7 +148,7 @@ enum MissingAnalyticsContext {
#[derive(Clone)]
struct ThreadMetadataState {
thread_source: Option<&'static str>,
thread_source: Option<ThreadSource>,
initialization_mode: ThreadInitializationMode,
subagent_source: Option<String>,
parent_thread_id: Option<String>,
@@ -156,6 +157,7 @@ struct ThreadMetadataState {
impl ThreadMetadataState {
fn from_thread_metadata(
session_source: &SessionSource,
thread_source: Option<ThreadSource>,
initialization_mode: ThreadInitializationMode,
) -> Self {
let (subagent_source, parent_thread_id) = match session_source {
@@ -172,7 +174,7 @@ impl ThreadMetadataState {
| SessionSource::Unknown => (None, None),
};
Self {
thread_source: session_source.thread_source_name(),
thread_source,
initialization_mode,
subagent_source,
parent_thread_id,
@@ -348,7 +350,7 @@ impl AnalyticsReducer {
thread_state
.metadata
.get_or_insert_with(|| ThreadMetadataState {
thread_source: Some("subagent"),
thread_source: Some(ThreadSource::Subagent),
initialization_mode: ThreadInitializationMode::New,
subagent_source: Some(subagent_source_name(&input.subagent_source)),
parent_thread_id,
@@ -749,13 +751,16 @@ impl AnalyticsReducer {
initialization_mode: ThreadInitializationMode,
out: &mut Vec<TrackEventRequest>,
) {
let thread_source: SessionSource = thread.source.into();
let session_source: SessionSource = thread.source.into();
let thread_id = thread.id;
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
};
let thread_metadata =
ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode);
let thread_metadata = ThreadMetadataState::from_thread_metadata(
&session_source,
thread.thread_source.map(Into::into),
initialization_mode,
);
self.threads.insert(
thread_id.clone(),
ThreadAnalyticsState {
@@ -857,7 +862,7 @@ impl AnalyticsReducer {
accepted_turn_id,
app_server_client: connection_state.app_server_client.clone(),
runtime: connection_state.runtime.clone(),
thread_source: thread_metadata.thread_source.map(str::to_string),
thread_source: thread_metadata.thread_source,
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id.clone(),
num_input_images: pending_request.num_input_images,
@@ -1023,7 +1028,7 @@ fn codex_turn_event_params(
runtime,
submission_type,
ephemeral,
thread_source: thread_metadata.thread_source.map(str::to_string),
thread_source: thread_metadata.thread_source,
initialization_mode: thread_metadata.initialization_mode,
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id.clone(),