[codex-analytics] rework thread_source for thread analytics (#20949)

## Summary
- make `thread_source` an explicit optional thread-level field on
`thread/start`, `thread/fork`, and returned thread payloads
- persist `thread_source` in rollout/session metadata so resumed live
threads retain the original value
- replace the old best-effort `session_source` -> `thread_source`
mapping with an explicit caller-supplied analytics classification

## Why
Before this change, analytics `thread_source` was populated by a
best-effort mapping from `session_source`. `session_source` describes
the runtime/client surface, not the actual thread-level origin, so that
projection was not accurate enough to distinguish cases such as `user`,
`subagent`, `memory_consolidation`, and future thread origins reliably.

Making `thread_source` explicit keeps one thread-level analytics field
while letting callers provide the real classification directly instead
of recovering it indirectly from `session_source`.

## Impact
For new analytics events, `thread_source` now reflects the explicit
thread-level classification supplied by the caller rather than an
inferred value derived from `session_source`. Existing protocol fields
remain optional; callers that omit `threadSource` now produce `null`
instead of a best-effort inferred value.

## Validation
- `just write-app-server-schema`
- `cargo test -p codex-analytics -p codex-core -p
codex-app-server-protocol --no-run`
- `cargo test -p codex-app-server-protocol
generated_ts_optional_nullable_fields_only_in_params`
- `cargo test -p codex-analytics
thread_initialized_event_serializes_expected_shape`
- `cargo test -p codex-core
resume_stopped_thread_from_rollout_preserves_thread_source`
This commit is contained in:
rhan-oai
2026-05-05 19:12:31 -07:00
committed by Channing Conger
parent 55c135600c
commit 5ef71a8e53
98 changed files with 896 additions and 90 deletions

View File

@@ -272,6 +272,7 @@ fn stored_thread_from_state(
cwd: PathBuf::new(),
cli_version: "test".to_string(),
source: created.source.clone(),
thread_source: created.thread_source,
agent_nickname: None,
agent_role: None,
agent_path: None,

View File

@@ -34,6 +34,7 @@ pub(super) async fn create_thread(
params.thread_id,
params.forked_from_id,
params.source,
params.thread_source,
params.base_instructions,
params.dynamic_tools,
event_persistence_mode(params.event_persistence_mode),

View File

@@ -130,6 +130,7 @@ pub(super) fn stored_thread_from_rollout_item(
cwd: item.cwd.unwrap_or_default(),
cli_version: item.cli_version.unwrap_or_default(),
source,
thread_source: None,
agent_nickname: item.agent_nickname,
agent_role: item.agent_role,
agent_path: None,

View File

@@ -744,6 +744,7 @@ mod tests {
thread_id,
forked_from_id: None,
source: SessionSource::Exec,
thread_source: None,
base_instructions: BaseInstructions::default(),
dynamic_tools: Vec::new(),
metadata: thread_metadata(),

View File

@@ -274,10 +274,11 @@ async fn stored_thread_from_sqlite_metadata(
.ok()
.flatten(),
};
let forked_from_id = read_session_meta_line(metadata.rollout_path.as_path())
let session_meta = read_session_meta_line(metadata.rollout_path.as_path())
.await
.ok()
.and_then(|meta_line| meta_line.meta.forked_from_id);
.map(|meta_line| meta_line.meta);
let forked_from_id = session_meta.as_ref().and_then(|meta| meta.forked_from_id);
StoredThread {
thread_id: metadata.id,
rollout_path: Some(metadata.rollout_path),
@@ -297,6 +298,7 @@ async fn stored_thread_from_sqlite_metadata(
cwd: metadata.cwd,
cli_version: metadata.cli_version,
source: parse_session_source(&metadata.source),
thread_source: metadata.thread_source,
agent_nickname: metadata.agent_nickname,
agent_role: metadata.agent_role,
agent_path: metadata.agent_path,
@@ -362,6 +364,7 @@ fn stored_thread_from_meta_line(
cwd: meta_line.meta.cwd,
cli_version: meta_line.meta.cli_version,
source: meta_line.meta.source,
thread_source: meta_line.meta.thread_source,
agent_nickname: meta_line.meta.agent_nickname,
agent_role: meta_line.meta.agent_role,
agent_path: meta_line.meta.agent_path,

View File

@@ -16,6 +16,7 @@ use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::ThreadMemoryMode;
use codex_protocol::protocol::ThreadSource;
use super::proto;
use crate::GitInfoPatch;
@@ -296,6 +297,11 @@ pub(super) fn stored_thread_from_proto(
cwd: PathBuf::from(thread.cwd),
cli_version: thread.cli_version,
source,
thread_source: thread
.thread_source
.map(|thread_source| thread_source.parse::<ThreadSource>())
.transpose()
.map_err(|error| ThreadStoreError::Internal { message: error })?,
agent_nickname: thread.agent_nickname,
agent_role: thread.agent_role,
agent_path: thread.agent_path,
@@ -340,6 +346,7 @@ pub(super) fn stored_thread_to_proto(thread: StoredThread) -> proto::StoredThrea
cwd: thread.cwd.to_string_lossy().into_owned(),
cli_version: thread.cli_version,
source: Some(proto_session_source(&thread.source)),
thread_source: thread.thread_source.map(|source| source.to_string()),
git_info: thread.git_info.map(git_info_to_proto),
agent_nickname: thread.agent_nickname,
agent_role: thread.agent_role,

View File

@@ -140,6 +140,7 @@ mod tests {
kind: proto::SessionSourceKind::Cli.into(),
..Default::default()
}),
thread_source: Some("user".to_string()),
git_info: Some(proto::GitInfo {
sha: Some("abc123".to_string()),
branch: Some("main".to_string()),
@@ -250,6 +251,7 @@ mod tests {
sub_agent_role: Some("explorer".to_string()),
..Default::default()
}),
thread_source: Some("subagent".to_string()),
git_info: Some(proto::GitInfo {
sha: Some("abc123".to_string()),
branch: Some("main".to_string()),

View File

@@ -358,6 +358,7 @@ mod tests {
thread_id: ThreadId::new(),
forked_from_id: None,
source: SessionSource::Exec,
thread_source: None,
base_instructions: BaseInstructions::default(),
dynamic_tools: Vec::new(),
metadata: metadata.clone(),

View File

@@ -132,6 +132,7 @@ message StoredThread {
optional string sandbox_policy_json = 21;
optional string token_usage_json = 22;
optional StoredThreadHistory history = 23;
optional string thread_source = 24;
}
message SessionSource {

View File

@@ -168,6 +168,8 @@ pub struct StoredThread {
pub token_usage_json: ::core::option::Option<::prost::alloc::string::String>,
#[prost(message, optional, tag = "23")]
pub history: ::core::option::Option<StoredThreadHistory>,
#[prost(string, optional, tag = "24")]
pub thread_source: ::core::option::Option<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct SessionSource {

View File

@@ -12,6 +12,7 @@ use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::ThreadMemoryMode as MemoryMode;
use codex_protocol::protocol::ThreadSource;
use codex_protocol::protocol::TokenUsage;
use serde::Deserialize;
use serde::Serialize;
@@ -48,6 +49,8 @@ pub struct CreateThreadParams {
pub forked_from_id: Option<ThreadId>,
/// Runtime source for the thread.
pub source: SessionSource,
/// Optional analytics source classification for this thread.
pub thread_source: Option<ThreadSource>,
/// Base instructions persisted in session metadata.
pub base_instructions: BaseInstructions,
/// Dynamic tools available to the thread at startup.
@@ -211,6 +214,8 @@ pub struct StoredThread {
pub cli_version: String,
/// Runtime source for the thread.
pub source: SessionSource,
/// Optional analytics source classification for this thread.
pub thread_source: Option<ThreadSource>,
/// Optional random nickname for thread-spawn sub-agents.
pub agent_nickname: Option<String>,
/// Optional role for thread-spawn sub-agents.