diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index d2aadc8707..823dcace91 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -79,6 +79,7 @@ use codex_app_server_protocol::Thread; use codex_app_server_protocol::ThreadArchiveParams; use codex_app_server_protocol::ThreadArchiveResponse; use codex_app_server_protocol::ThreadResumeResponse; +use codex_app_server_protocol::ThreadSource as AppServerThreadSource; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus; use codex_app_server_protocol::Turn; @@ -107,6 +108,7 @@ use codex_protocol::protocol::HookSource; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; +use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TokenUsage; use codex_utils_absolute_path::test_support::PathBufExt; use codex_utils_absolute_path::test_support::test_path_buf; @@ -118,14 +120,11 @@ use std::sync::Arc; use std::sync::Mutex; use tokio::sync::mpsc; -fn sample_thread(thread_id: &str, ephemeral: bool) -> Thread { - sample_thread_with_source(thread_id, ephemeral, AppServerSessionSource::Exec) -} - -fn sample_thread_with_source( +fn sample_thread_with_metadata( thread_id: &str, ephemeral: bool, source: AppServerSessionSource, + thread_source: Option, ) -> Thread { Thread { id: thread_id.to_string(), @@ -140,6 +139,7 @@ fn sample_thread_with_source( cwd: test_path_buf("/tmp").abs(), cli_version: "0.0.0".to_string(), source, + thread_source, agent_nickname: None, agent_role: None, git_info: None, @@ -154,7 +154,12 @@ fn sample_thread_start_response( model: &str, ) -> ClientResponsePayload { ClientResponsePayload::ThreadStart(ThreadStartResponse { - thread: sample_thread(thread_id, ephemeral), + thread: sample_thread_with_metadata( + thread_id, + ephemeral, + AppServerSessionSource::Exec, + Some(AppServerThreadSource::User), + ), model: model.to_string(), model_provider: "openai".to_string(), service_tier: None, @@ -198,6 +203,7 @@ fn sample_thread_resume_response( ephemeral, model, AppServerSessionSource::Exec, + Some(AppServerThreadSource::User), ) } @@ -206,9 +212,10 @@ fn sample_thread_resume_response_with_source( ephemeral: bool, model: &str, source: AppServerSessionSource, + thread_source: Option, ) -> ClientResponsePayload { ClientResponsePayload::ThreadResume(ThreadResumeResponse { - thread: sample_thread_with_source(thread_id, ephemeral, source), + thread: sample_thread_with_metadata(thread_id, ephemeral, source, thread_source), model: model.to_string(), model_provider: "openai".to_string(), service_tier: None, @@ -753,7 +760,7 @@ fn compaction_event_serializes_expected_shape() { }, sample_app_server_client_metadata(), sample_runtime_metadata(), - Some("user"), + Some(ThreadSource::User), /*subagent_source*/ None, /*parent_thread_id*/ None, ), @@ -852,7 +859,7 @@ fn thread_initialized_event_serializes_expected_shape() { }, model: "gpt-5".to_string(), ephemeral: true, - thread_source: Some("user"), + thread_source: Some(ThreadSource::User), initialization_mode: ThreadInitializationMode::New, subagent_source: None, parent_thread_id: None, @@ -1196,6 +1203,7 @@ async fn compaction_event_ingests_custom_fact() { agent_nickname: None, agent_role: None, }), + Some(AppServerThreadSource::Subagent), )), }, &mut events, @@ -2116,7 +2124,7 @@ fn turn_event_serializes_expected_shape() { runtime: sample_runtime_metadata(), submission_type: None, ephemeral: false, - thread_source: Some("user".to_string()), + thread_source: Some(ThreadSource::User), initialization_mode: ThreadInitializationMode::New, subagent_source: None, parent_thread_id: None, diff --git a/codex-rs/analytics/src/client_tests.rs b/codex-rs/analytics/src/client_tests.rs index 14ce570d79..c36b5cf9a7 100644 --- a/codex-rs/analytics/src/client_tests.rs +++ b/codex-rs/analytics/src/client_tests.rs @@ -87,6 +87,7 @@ fn sample_thread(thread_id: &str) -> Thread { cwd: test_path_buf("/tmp").abs(), cli_version: "0.0.0".to_string(), source: AppServerSessionSource::Exec, + thread_source: None, agent_nickname: None, agent_role: None, git_info: None, diff --git a/codex-rs/analytics/src/events.rs b/codex-rs/analytics/src/events.rs index ca8febd1b3..383cf6857e 100644 --- a/codex-rs/analytics/src/events.rs +++ b/codex-rs/analytics/src/events.rs @@ -33,6 +33,7 @@ use codex_protocol::protocol::HookEventName; use codex_protocol::protocol::HookRunStatus; use codex_protocol::protocol::HookSource; use codex_protocol::protocol::SubAgentSource; +use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TokenUsage; use serde::Serialize; @@ -126,7 +127,7 @@ pub(crate) struct ThreadInitializedEventParams { pub(crate) runtime: CodexRuntimeMetadata, pub(crate) model: String, pub(crate) ephemeral: bool, - pub(crate) thread_source: Option<&'static str>, + pub(crate) thread_source: Option, pub(crate) initialization_mode: ThreadInitializationMode, pub(crate) subagent_source: Option, pub(crate) parent_thread_id: Option, @@ -647,7 +648,7 @@ pub(crate) struct CodexCompactionEventParams { pub(crate) turn_id: String, pub(crate) app_server_client: CodexAppServerClientMetadata, pub(crate) runtime: CodexRuntimeMetadata, - pub(crate) thread_source: Option<&'static str>, + pub(crate) thread_source: Option, pub(crate) subagent_source: Option, pub(crate) parent_thread_id: Option, pub(crate) trigger: CompactionTrigger, @@ -680,7 +681,7 @@ pub(crate) struct CodexTurnEventParams { pub(crate) app_server_client: CodexAppServerClientMetadata, pub(crate) runtime: CodexRuntimeMetadata, pub(crate) ephemeral: bool, - pub(crate) thread_source: Option, + pub(crate) thread_source: Option, pub(crate) initialization_mode: ThreadInitializationMode, pub(crate) subagent_source: Option, pub(crate) parent_thread_id: Option, @@ -733,7 +734,7 @@ pub(crate) struct CodexTurnSteerEventParams { pub(crate) accepted_turn_id: Option, pub(crate) app_server_client: CodexAppServerClientMetadata, pub(crate) runtime: CodexRuntimeMetadata, - pub(crate) thread_source: Option, + pub(crate) thread_source: Option, pub(crate) subagent_source: Option, pub(crate) parent_thread_id: Option, pub(crate) num_input_images: usize, @@ -836,7 +837,7 @@ pub(crate) fn codex_compaction_event_params( input: CodexCompactionEvent, app_server_client: CodexAppServerClientMetadata, runtime: CodexRuntimeMetadata, - thread_source: Option<&'static str>, + thread_source: Option, subagent_source: Option, parent_thread_id: Option, ) -> CodexCompactionEventParams { @@ -940,7 +941,7 @@ pub(crate) fn subagent_thread_started_event_request( runtime: current_runtime_metadata(), model: input.model, ephemeral: input.ephemeral, - thread_source: Some("subagent"), + thread_source: Some(ThreadSource::Subagent), initialization_mode: ThreadInitializationMode::New, subagent_source: Some(subagent_source_name(&input.subagent_source)), parent_thread_id: input diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index 3554e570dd..772bb6c624 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -64,6 +64,7 @@ use codex_protocol::config_types::ReasoningSummary; use codex_protocol::models::PermissionProfile; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SkillScope; +use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TokenUsage; use sha1::Digest; use std::collections::HashMap; @@ -147,7 +148,7 @@ enum MissingAnalyticsContext { #[derive(Clone)] struct ThreadMetadataState { - thread_source: Option<&'static str>, + thread_source: Option, initialization_mode: ThreadInitializationMode, subagent_source: Option, parent_thread_id: Option, @@ -156,6 +157,7 @@ struct ThreadMetadataState { impl ThreadMetadataState { fn from_thread_metadata( session_source: &SessionSource, + thread_source: Option, initialization_mode: ThreadInitializationMode, ) -> Self { let (subagent_source, parent_thread_id) = match session_source { @@ -172,7 +174,7 @@ impl ThreadMetadataState { | SessionSource::Unknown => (None, None), }; Self { - thread_source: session_source.thread_source_name(), + thread_source, initialization_mode, subagent_source, parent_thread_id, @@ -348,7 +350,7 @@ impl AnalyticsReducer { thread_state .metadata .get_or_insert_with(|| ThreadMetadataState { - thread_source: Some("subagent"), + thread_source: Some(ThreadSource::Subagent), initialization_mode: ThreadInitializationMode::New, subagent_source: Some(subagent_source_name(&input.subagent_source)), parent_thread_id, @@ -749,13 +751,16 @@ impl AnalyticsReducer { initialization_mode: ThreadInitializationMode, out: &mut Vec, ) { - let thread_source: SessionSource = thread.source.into(); + let session_source: SessionSource = thread.source.into(); let thread_id = thread.id; let Some(connection_state) = self.connections.get(&connection_id) else { return; }; - let thread_metadata = - ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode); + let thread_metadata = ThreadMetadataState::from_thread_metadata( + &session_source, + thread.thread_source.map(Into::into), + initialization_mode, + ); self.threads.insert( thread_id.clone(), ThreadAnalyticsState { @@ -857,7 +862,7 @@ impl AnalyticsReducer { accepted_turn_id, app_server_client: connection_state.app_server_client.clone(), runtime: connection_state.runtime.clone(), - thread_source: thread_metadata.thread_source.map(str::to_string), + thread_source: thread_metadata.thread_source, subagent_source: thread_metadata.subagent_source.clone(), parent_thread_id: thread_metadata.parent_thread_id.clone(), num_input_images: pending_request.num_input_images, @@ -1023,7 +1028,7 @@ fn codex_turn_event_params( runtime, submission_type, ephemeral, - thread_source: thread_metadata.thread_source.map(str::to_string), + thread_source: thread_metadata.thread_source, initialization_mode: thread_metadata.initialization_mode, subagent_source: thread_metadata.subagent_source.clone(), parent_thread_id: thread_metadata.parent_thread_id.clone(), diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 9ea9893f5b..75df89211d 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -3558,6 +3558,17 @@ }, "threadId": { "type": "string" + }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional client-supplied analytics source classification for this forked thread." } }, "required": [ @@ -4032,6 +4043,14 @@ ], "type": "string" }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadSourceKind": { "enum": [ "cli", @@ -4165,6 +4184,17 @@ "type": "null" } ] + }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional client-supplied analytics source classification for this thread." } }, "type": "object" diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index ae54b610f5..f5de1f456c 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -3088,6 +3088,17 @@ ], "description": "Current runtime status for the thread." }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional analytics source classification for this thread." + }, "turns": { "description": "Only populated on `thread/resume`, `thread/rollback`, `thread/fork`, and `thread/read` (when `includeTurns` is true) responses. For all other responses and notifications returning a Thread, the turns field will be an empty list.", "items": { @@ -4094,6 +4105,14 @@ ], "type": "object" }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadStartedNotification": { "properties": { "thread": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index a1519e4240..922ca49a5c 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -15232,6 +15232,17 @@ ], "description": "Current runtime status for the thread." }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/v2/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional analytics source classification for this thread." + }, "turns": { "description": "Only populated on `thread/resume`, `thread/rollback`, `thread/fork`, and `thread/read` (when `includeTurns` is true) responses. For all other responses and notifications returning a Thread, the turns field will be an empty list.", "items": { @@ -15446,6 +15457,17 @@ }, "threadId": { "type": "string" + }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/v2/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional client-supplied analytics source classification for this forked thread." } }, "required": [ @@ -17126,6 +17148,14 @@ ], "type": "string" }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadSourceKind": { "enum": [ "cli", @@ -17260,6 +17290,17 @@ "type": "null" } ] + }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/v2/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional client-supplied analytics source classification for this thread." } }, "title": "ThreadStartParams", diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 5cecb71ed5..8382944c8e 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -13118,6 +13118,17 @@ ], "description": "Current runtime status for the thread." }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional analytics source classification for this thread." + }, "turns": { "description": "Only populated on `thread/resume`, `thread/rollback`, `thread/fork`, and `thread/read` (when `includeTurns` is true) responses. For all other responses and notifications returning a Thread, the turns field will be an empty list.", "items": { @@ -13332,6 +13343,17 @@ }, "threadId": { "type": "string" + }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional client-supplied analytics source classification for this forked thread." } }, "required": [ @@ -15012,6 +15034,14 @@ ], "type": "string" }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadSourceKind": { "enum": [ "cli", @@ -15146,6 +15176,17 @@ "type": "null" } ] + }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional client-supplied analytics source classification for this thread." } }, "title": "ThreadStartParams", diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json index 970e2fe9ca..6419bc9422 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json @@ -137,6 +137,14 @@ "flex" ], "type": "string" + }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" } }, "description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using path, the thread_id param will be ignored.\n\nPrefer using thread_id whenever possible.", @@ -232,6 +240,17 @@ }, "threadId": { "type": "string" + }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional client-supplied analytics source classification for this forked thread." } }, "required": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json index 00689feda0..05f9849df3 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json @@ -1419,6 +1419,17 @@ ], "description": "Current runtime status for the thread." }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional analytics source classification for this thread." + }, "turns": { "description": "Only populated on `thread/resume`, `thread/rollback`, `thread/fork`, and `thread/read` (when `includeTurns` is true) responses. For all other responses and notifications returning a Thread, the turns field will be an empty list.", "items": { @@ -2117,6 +2128,14 @@ } ] }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadStatus": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json index 4db2ae4642..aaeb3a5ab2 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json @@ -869,6 +869,17 @@ ], "description": "Current runtime status for the thread." }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional analytics source classification for this thread." + }, "turns": { "description": "Only populated on `thread/resume`, `thread/rollback`, `thread/fork`, and `thread/read` (when `includeTurns` is true) responses. For all other responses and notifications returning a Thread, the turns field will be an empty list.", "items": { @@ -1567,6 +1578,14 @@ } ] }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadStatus": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json index 003c75e597..6237f8b746 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json @@ -869,6 +869,17 @@ ], "description": "Current runtime status for the thread." }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional analytics source classification for this thread." + }, "turns": { "description": "Only populated on `thread/resume`, `thread/rollback`, `thread/fork`, and `thread/read` (when `includeTurns` is true) responses. For all other responses and notifications returning a Thread, the turns field will be an empty list.", "items": { @@ -1567,6 +1578,14 @@ } ] }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadStatus": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json index 50147feca1..4a64c50252 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json @@ -869,6 +869,17 @@ ], "description": "Current runtime status for the thread." }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional analytics source classification for this thread." + }, "turns": { "description": "Only populated on `thread/resume`, `thread/rollback`, `thread/fork`, and `thread/read` (when `includeTurns` is true) responses. For all other responses and notifications returning a Thread, the turns field will be an empty list.", "items": { @@ -1567,6 +1578,14 @@ } ] }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadStatus": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json index ff774402b9..81eff83400 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json @@ -1419,6 +1419,17 @@ ], "description": "Current runtime status for the thread." }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional analytics source classification for this thread." + }, "turns": { "description": "Only populated on `thread/resume`, `thread/rollback`, `thread/fork`, and `thread/read` (when `includeTurns` is true) responses. For all other responses and notifications returning a Thread, the turns field will be an empty list.", "items": { @@ -2117,6 +2128,14 @@ } ] }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadStatus": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json index 75b08d53d6..f495e4cdfe 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json @@ -869,6 +869,17 @@ ], "description": "Current runtime status for the thread." }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional analytics source classification for this thread." + }, "turns": { "description": "Only populated on `thread/resume`, `thread/rollback`, `thread/fork`, and `thread/read` (when `includeTurns` is true) responses. For all other responses and notifications returning a Thread, the turns field will be an empty list.", "items": { @@ -1567,6 +1578,14 @@ } ] }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadStatus": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartParams.json index d5f0e9bfcc..aa5029afd9 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartParams.json @@ -172,6 +172,14 @@ ], "type": "string" }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadStartSource": { "enum": [ "startup", @@ -312,6 +320,17 @@ "type": "null" } ] + }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional client-supplied analytics source classification for this thread." } }, "title": "ThreadStartParams", diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json index a0f39a29f6..b2d5cbfa08 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json @@ -1419,6 +1419,17 @@ ], "description": "Current runtime status for the thread." }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional analytics source classification for this thread." + }, "turns": { "description": "Only populated on `thread/resume`, `thread/rollback`, `thread/fork`, and `thread/read` (when `includeTurns` is true) responses. For all other responses and notifications returning a Thread, the turns field will be an empty list.", "items": { @@ -2117,6 +2128,14 @@ } ] }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadStatus": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json index ff7c4a5320..98c163a41e 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json @@ -869,6 +869,17 @@ ], "description": "Current runtime status for the thread." }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional analytics source classification for this thread." + }, "turns": { "description": "Only populated on `thread/resume`, `thread/rollback`, `thread/fork`, and `thread/read` (when `includeTurns` is true) responses. For all other responses and notifications returning a Thread, the turns field will be an empty list.", "items": { @@ -1567,6 +1578,14 @@ } ] }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadStatus": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json index 1b5aa29683..5e26982d51 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json @@ -869,6 +869,17 @@ ], "description": "Current runtime status for the thread." }, + "threadSource": { + "anyOf": [ + { + "$ref": "#/definitions/ThreadSource" + }, + { + "type": "null" + } + ], + "description": "Optional analytics source classification for this thread." + }, "turns": { "description": "Only populated on `thread/resume`, `thread/rollback`, `thread/fork`, and `thread/read` (when `includeTurns` is true) responses. For all other responses and notifications returning a Thread, the turns field will be an empty list.", "items": { @@ -1567,6 +1578,14 @@ } ] }, + "ThreadSource": { + "enum": [ + "user", + "subagent", + "memory_consolidation" + ], + "type": "string" + }, "ThreadStatus": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/Thread.ts b/codex-rs/app-server-protocol/schema/typescript/v2/Thread.ts index 8c4c9394bf..99e622565a 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/Thread.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/Thread.ts @@ -4,6 +4,7 @@ import type { AbsolutePathBuf } from "../AbsolutePathBuf"; import type { GitInfo } from "./GitInfo"; import type { SessionSource } from "./SessionSource"; +import type { ThreadSource } from "./ThreadSource"; import type { ThreadStatus } from "./ThreadStatus"; import type { Turn } from "./Turn"; @@ -52,6 +53,10 @@ cliVersion: string, * Origin of the thread (CLI, VSCode, codex exec, codex app-server, etc.). */ source: SessionSource, +/** + * Optional analytics source classification for this thread. + */ +threadSource: ThreadSource | null, /** * Optional random unique nickname assigned to an AgentControl-spawned sub-agent. */ diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts index ba7119e9ed..ea67b491ad 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts @@ -6,6 +6,7 @@ import type { JsonValue } from "../serde_json/JsonValue"; import type { ApprovalsReviewer } from "./ApprovalsReviewer"; import type { AskForApproval } from "./AskForApproval"; import type { SandboxMode } from "./SandboxMode"; +import type { ThreadSource } from "./ThreadSource"; /** * There are two ways to fork a thread: @@ -23,4 +24,7 @@ model?: string | null, modelProvider?: string | null, serviceTier?: ServiceTier * Override where approval requests are routed for review on this thread * and subsequent turns. */ -approvalsReviewer?: ApprovalsReviewer | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, ephemeral?: boolean}; +approvalsReviewer?: ApprovalsReviewer | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, ephemeral?: boolean, /** + * Optional client-supplied analytics source classification for this forked thread. + */ +threadSource?: ThreadSource | null}; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadSource.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadSource.ts new file mode 100644 index 0000000000..8f55524801 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadSource.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ThreadSource = "user" | "subagent" | "memory_consolidation"; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts index 374ac2e681..cecc183f92 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts @@ -7,10 +7,14 @@ import type { JsonValue } from "../serde_json/JsonValue"; import type { ApprovalsReviewer } from "./ApprovalsReviewer"; import type { AskForApproval } from "./AskForApproval"; import type { SandboxMode } from "./SandboxMode"; +import type { ThreadSource } from "./ThreadSource"; import type { ThreadStartSource } from "./ThreadStartSource"; export type ThreadStartParams = {model?: string | null, modelProvider?: string | null, serviceTier?: ServiceTier | null | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, /** * Override where approval requests are routed for review on this thread * and subsequent turns. */ -approvalsReviewer?: ApprovalsReviewer | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, serviceName?: string | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, ephemeral?: boolean | null, sessionStartSource?: ThreadStartSource | null}; +approvalsReviewer?: ApprovalsReviewer | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, serviceName?: string | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, ephemeral?: boolean | null, sessionStartSource?: ThreadStartSource | null, /** + * Optional client-supplied analytics source classification for this thread. + */ +threadSource?: ThreadSource | null}; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index 547f0f1018..4998cdc710 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -396,6 +396,7 @@ export type { ThreadSetNameResponse } from "./ThreadSetNameResponse"; export type { ThreadShellCommandParams } from "./ThreadShellCommandParams"; export type { ThreadShellCommandResponse } from "./ThreadShellCommandResponse"; export type { ThreadSortKey } from "./ThreadSortKey"; +export type { ThreadSource } from "./ThreadSource"; export type { ThreadSourceKind } from "./ThreadSourceKind"; export type { ThreadStartParams } from "./ThreadStartParams"; export type { ThreadStartResponse } from "./ThreadStartResponse"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 5ab2e5ea01..9d425d8ab4 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -2184,6 +2184,7 @@ mod tests { cwd: cwd.clone(), cli_version: "0.0.0".to_string(), source: v2::SessionSource::Exec, + thread_source: None, agent_nickname: None, agent_role: None, git_info: None, @@ -2226,6 +2227,7 @@ mod tests { "cwd": absolute_path_string("tmp"), "cliVersion": "0.0.0", "source": "exec", + "threadSource": null, "agentNickname": null, "agentRole": null, "gitInfo": null, diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs index ad125da4e6..578ef9193f 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs @@ -6,6 +6,7 @@ use super::PermissionProfileSelectionParams; use super::SandboxMode; use super::SandboxPolicy; use super::Thread; +use super::ThreadSource; use super::Turn; use super::TurnEnvironmentParams; use super::shared::v2_enum_from_core; @@ -134,6 +135,9 @@ pub struct ThreadStartParams { pub ephemeral: Option, #[ts(optional = nullable)] pub session_start_source: Option, + /// Optional client-supplied analytics source classification for this thread. + #[ts(optional = nullable)] + pub thread_source: Option, /// Optional sticky environments for this thread. /// /// Omitted selects the default environment when environment access is @@ -388,6 +392,9 @@ pub struct ThreadForkParams { pub developer_instructions: Option, #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub ephemeral: bool, + /// Optional client-supplied analytics source classification for this forked thread. + #[ts(optional = nullable)] + pub thread_source: Option, /// When true, return only thread metadata and live fork state without /// populating `thread.turns`. This is useful when the client plans to call /// `thread/turns/list` immediately after forking. diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread_data.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread_data.rs index 07c21e3906..cb02705849 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread_data.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread_data.rs @@ -4,6 +4,7 @@ use super::ThreadStatus; use super::TurnStatus; use codex_protocol::protocol::SessionSource as CoreSessionSource; use codex_protocol::protocol::SubAgentSource as CoreSubAgentSource; +use codex_protocol::protocol::ThreadSource as CoreThreadSource; use codex_utils_absolute_path::AbsolutePathBuf; use schemars::JsonSchema; use serde::Deserialize; @@ -60,6 +61,35 @@ impl From for CoreSessionSource { } } +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "snake_case")] +#[ts(rename_all = "snake_case", export_to = "v2/")] +pub enum ThreadSource { + User, + Subagent, + MemoryConsolidation, +} + +impl From for ThreadSource { + fn from(value: CoreThreadSource) -> Self { + match value { + CoreThreadSource::User => ThreadSource::User, + CoreThreadSource::Subagent => ThreadSource::Subagent, + CoreThreadSource::MemoryConsolidation => ThreadSource::MemoryConsolidation, + } + } +} + +impl From for CoreThreadSource { + fn from(value: ThreadSource) -> Self { + match value { + ThreadSource::User => CoreThreadSource::User, + ThreadSource::Subagent => CoreThreadSource::Subagent, + ThreadSource::MemoryConsolidation => CoreThreadSource::MemoryConsolidation, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -98,6 +128,8 @@ pub struct Thread { pub cli_version: String, /// Origin of the thread (CLI, VSCode, codex exec, codex app-server, etc.). pub source: SessionSource, + /// Optional analytics source classification for this thread. + pub thread_source: Option, /// Optional random unique nickname assigned to an AgentControl-spawned sub-agent. pub agent_nickname: Option, /// Optional role (agent_role) assigned to an AgentControl-spawned sub-agent. diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 18fcc14091..6ff7f5f03f 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -2182,6 +2182,7 @@ mod tests { cwd: test_path_buf("/tmp").abs().into(), cli_version: "0.0.0".to_string(), source: SessionSource::Cli, + thread_source: None, agent_nickname: None, agent_role: None, agent_path: None, diff --git a/codex-rs/app-server/src/request_processors/external_agent_config_processor.rs b/codex-rs/app-server/src/request_processors/external_agent_config_processor.rs index ca7f8a9d4c..1c741944b5 100644 --- a/codex-rs/app-server/src/request_processors/external_agent_config_processor.rs +++ b/codex-rs/app-server/src/request_processors/external_agent_config_processor.rs @@ -307,6 +307,7 @@ impl ExternalAgentConfigRequestProcessor { config, initial_history: InitialHistory::Forked(rollout_items), session_source: None, + thread_source: None, dynamic_tools: Vec::new(), persist_extended_history: false, metrics_service_name: None, diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index a9c7ab90d1..321d1703dc 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -746,6 +746,7 @@ impl ThreadRequestProcessor { personality, ephemeral, session_start_source, + thread_source, environments, persist_extended_history, } = params; @@ -799,6 +800,7 @@ impl ThreadRequestProcessor { typesafe_overrides, dynamic_tools, session_start_source, + thread_source.map(Into::into), environment_selections, service_name, experimental_raw_events, @@ -882,6 +884,7 @@ impl ThreadRequestProcessor { typesafe_overrides: ConfigOverrides, dynamic_tools: Option>, session_start_source: Option, + thread_source: Option, environments: Option>, service_name: Option, experimental_raw_events: bool, @@ -998,6 +1001,7 @@ impl ThreadRequestProcessor { codex_app_server_protocol::ThreadStartSource::Clear => InitialHistory::Cleared, }, session_source: None, + thread_source, dynamic_tools: core_dynamic_tools, persist_extended_history: false, metrics_service_name: service_name, @@ -2382,6 +2386,11 @@ impl ThreadRequestProcessor { return Ok(()); } }; + thread.thread_source = codex_thread + .config_snapshot() + .await + .thread_source + .map(Into::into); self.thread_watch_manager .upsert_thread(thread.clone()) @@ -2869,6 +2878,7 @@ impl ThreadRequestProcessor { base_instructions, developer_instructions, ephemeral, + thread_source, exclude_turns, persist_extended_history, } = params; @@ -2959,6 +2969,7 @@ impl ThreadRequestProcessor { history: history_items.clone(), rollout_path: source_thread.rollout_path.clone(), }), + thread_source.map(Into::into), /*persist_extended_history*/ false, self.request_trace_context(&request_id).await, ) @@ -3018,6 +3029,11 @@ impl ThreadRequestProcessor { } thread }; + thread.thread_source = forked_thread + .config_snapshot() + .await + .thread_source + .map(Into::into); self.thread_watch_manager .upsert_thread_silently(thread.clone()) @@ -3620,6 +3636,7 @@ pub(crate) fn thread_from_stored_thread( agent_nickname: source.get_nickname(), agent_role: source.get_agent_role(), source: source.into(), + thread_source: thread.thread_source.map(Into::into), git_info, name: thread.name, turns: Vec::new(), @@ -3682,6 +3699,7 @@ fn summary_from_state_db_metadata( cwd: PathBuf, cli_version: String, source: String, + _thread_source: Option, agent_nickname: Option, agent_role: Option, git_sha: Option, @@ -3732,6 +3750,7 @@ fn summary_from_thread_metadata(metadata: &ThreadMetadata) -> ConversationSummar metadata.cwd.clone(), metadata.cli_version.clone(), metadata.source.clone(), + metadata.thread_source, metadata.agent_nickname.clone(), metadata.agent_role.clone(), metadata.git_sha.clone(), @@ -3815,6 +3834,7 @@ fn build_thread_from_snapshot( agent_nickname: config_snapshot.session_source.get_nickname(), agent_role: config_snapshot.session_source.get_agent_role(), source: config_snapshot.session_source.clone().into(), + thread_source: config_snapshot.thread_source.map(Into::into), git_info: None, name: None, turns: Vec::new(), diff --git a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs index 4f3e476e48..6bcbda478c 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs @@ -283,6 +283,7 @@ mod thread_processor_behavior_tests { cwd: PathBuf::from("/tmp"), cli_version: "0.0.0".to_string(), source: SessionSource::Cli, + thread_source: Some(codex_protocol::protocol::ThreadSource::User), agent_nickname: None, agent_role: None, agent_path: None, @@ -540,6 +541,7 @@ mod thread_processor_behavior_tests { reasoning_effort: None, personality: None, session_source: SessionSource::Cli, + thread_source: None, }; assert_eq!( @@ -828,6 +830,7 @@ mod thread_processor_behavior_tests { agent_nickname: None, agent_role: None, }), + thread_source: Some(codex_protocol::protocol::ThreadSource::Subagent), agent_nickname: Some("atlas".to_string()), agent_role: Some("explorer".to_string()), model_provider: Some("test-provider".to_string()), @@ -849,6 +852,7 @@ mod thread_processor_behavior_tests { assert_eq!(thread.agent_nickname, Some("atlas".to_string())); assert_eq!(thread.agent_role, Some("explorer".to_string())); + assert_eq!(thread.thread_source, None); Ok(()) } @@ -975,6 +979,7 @@ mod thread_processor_behavior_tests { PathBuf::from("/"), "0.0.0".to_string(), source, + Some(codex_protocol::protocol::ThreadSource::Subagent), Some("atlas".to_string()), Some("explorer".to_string()), /*git_sha*/ None, diff --git a/codex-rs/app-server/src/request_processors/thread_summary.rs b/codex-rs/app-server/src/request_processors/thread_summary.rs index bb02affcc8..be3000e36a 100644 --- a/codex-rs/app-server/src/request_processors/thread_summary.rs +++ b/codex-rs/app-server/src/request_processors/thread_summary.rs @@ -278,6 +278,7 @@ pub(crate) fn summary_to_thread( agent_nickname: source.get_nickname(), agent_role: source.get_agent_role(), source: source.into(), + thread_source: None, git_info, name: None, turns: Vec::new(), diff --git a/codex-rs/app-server/src/request_processors/turn_processor.rs b/codex-rs/app-server/src/request_processors/turn_processor.rs index 05997815ef..a3783f995d 100644 --- a/codex-rs/app-server/src/request_processors/turn_processor.rs +++ b/codex-rs/app-server/src/request_processors/turn_processor.rs @@ -904,6 +904,7 @@ impl TurnRequestProcessor { history: parent_history.items, rollout_path: parent_thread.rollout_path(), }), + /*thread_source*/ None, /*persist_extended_history*/ false, self.request_trace_context(request_id).await, ) diff --git a/codex-rs/app-server/src/thread_status.rs b/codex-rs/app-server/src/thread_status.rs index b1373c293d..47da7e8cad 100644 --- a/codex-rs/app-server/src/thread_status.rs +++ b/codex-rs/app-server/src/thread_status.rs @@ -902,6 +902,7 @@ mod tests { agent_nickname: None, agent_role: None, source, + thread_source: None, git_info: None, name: None, turns: Vec::new(), diff --git a/codex-rs/app-server/tests/common/rollout.rs b/codex-rs/app-server/tests/common/rollout.rs index 06b273754c..6b2a9a0abe 100644 --- a/codex-rs/app-server/tests/common/rollout.rs +++ b/codex-rs/app-server/tests/common/rollout.rs @@ -138,6 +138,7 @@ pub fn create_fake_rollout_with_source( originator: "codex".to_string(), cli_version: "0.0.0".to_string(), source, + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, @@ -221,6 +222,7 @@ pub fn create_fake_rollout_with_text_elements( originator: "codex".to_string(), cli_version: "0.0.0".to_string(), source: SessionSource::Cli, + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/app-server/tests/suite/v2/analytics.rs b/codex-rs/app-server/tests/suite/v2/analytics.rs index 862721a154..c6f95af95d 100644 --- a/codex-rs/app-server/tests/suite/v2/analytics.rs +++ b/codex-rs/app-server/tests/suite/v2/analytics.rs @@ -170,6 +170,7 @@ pub(crate) fn assert_basic_thread_initialized_event( thread_id: &str, expected_model: &str, initialization_mode: &str, + expected_thread_source: &str, ) { assert_eq!(event["event_params"]["thread_id"], thread_id); assert_eq!( @@ -186,7 +187,10 @@ pub(crate) fn assert_basic_thread_initialized_event( ); assert_eq!(event["event_params"]["model"], expected_model); assert_eq!(event["event_params"]["ephemeral"], false); - assert_eq!(event["event_params"]["thread_source"], "user"); + assert_eq!( + event["event_params"]["thread_source"], + expected_thread_source + ); assert_eq!( event["event_params"]["subagent_source"], serde_json::Value::Null diff --git a/codex-rs/app-server/tests/suite/v2/skills_list.rs b/codex-rs/app-server/tests/suite/v2/skills_list.rs index b95adb9044..6b98e39b53 100644 --- a/codex-rs/app-server/tests/suite/v2/skills_list.rs +++ b/codex-rs/app-server/tests/suite/v2/skills_list.rs @@ -675,6 +675,7 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<( personality: None, ephemeral: None, session_start_source: None, + thread_source: None, dynamic_tools: None, environments: None, mock_experimental_field: None, diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index fd773f2e30..5c4ba3a3e3 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -17,6 +17,7 @@ use codex_app_server_protocol::ThreadForkResponse; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadListResponse; +use codex_app_server_protocol::ThreadSource; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStartedNotification; @@ -90,6 +91,7 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> { let fork_id = mcp .send_thread_fork_request(ThreadForkParams { thread_id: conversation_id.clone(), + thread_source: Some(ThreadSource::User), ..Default::default() }) .await?; @@ -128,6 +130,7 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> { assert_ne!(thread_path.as_path(), original_path); assert!(thread.cwd.as_path().is_absolute()); assert_eq!(thread.source, SessionSource::VsCode); + assert_eq!(thread.thread_source, Some(ThreadSource::User)); assert_eq!(thread.name, None); assert_eq!( @@ -188,6 +191,13 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> { Some(&json!([])), "thread/started must not emit copied fork turns" ); + assert_eq!( + started_thread_json + .get("threadSource") + .and_then(Value::as_str), + Some("user"), + "thread/started should preserve the caller-supplied fork origin" + ); let started: ThreadStartedNotification = serde_json::from_value(notif.params.expect("params must be present"))?; let mut expected_started_thread = thread; @@ -299,6 +309,7 @@ async fn thread_fork_emits_restored_token_usage_before_next_turn() -> Result<()> let fork_id = mcp .send_thread_fork_request(ThreadForkParams { thread_id: conversation_id, + thread_source: Some(ThreadSource::User), ..Default::default() }) .await?; @@ -403,6 +414,7 @@ async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> { let fork_id = mcp .send_thread_fork_request(ThreadForkParams { thread_id: conversation_id, + thread_source: Some(ThreadSource::User), ..Default::default() }) .await?; @@ -415,7 +427,7 @@ async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> { let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?; let event = thread_initialized_event(&payload)?; - assert_basic_thread_initialized_event(event, &thread.id, "mock-model", "forked"); + assert_basic_thread_initialized_event(event, &thread.id, "mock-model", "forked", "user"); Ok(()) } diff --git a/codex-rs/app-server/tests/suite/v2/thread_read.rs b/codex-rs/app-server/tests/suite/v2/thread_read.rs index 0dc616dc86..0e9a7a9a05 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_read.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_read.rs @@ -1119,6 +1119,7 @@ async fn seed_pathless_store_thread( thread_id, forked_from_id: None, source: ProtocolSessionSource::Cli, + thread_source: None, base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), metadata: ThreadPersistenceMetadata { diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 2e6665e091..4014e6f975 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -38,6 +38,7 @@ use codex_app_server_protocol::ThreadReadParams; use codex_app_server_protocol::ThreadReadResponse; use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadResumeResponse; +use codex_app_server_protocol::ThreadSource; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStatus; @@ -240,15 +241,20 @@ async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> { create_config_toml_with_chatgpt_base_url(codex_home.path(), &server.uri(), &server.uri())?; mount_analytics_capture(&server, codex_home.path()).await?; - let conversation_id = create_fake_rollout_with_text_elements( + let conversation_id = create_fake_rollout( codex_home.path(), "2025-01-05T12-00-00", "2025-01-05T12:00:00Z", "Saved user message", - Vec::new(), Some("mock_provider"), /*git_info*/ None, )?; + set_thread_source_on_fake_rollout( + codex_home.path(), + "2025-01-05T12-00-00", + &conversation_id, + "user", + )?; let mut mcp = McpProcess::new_without_managed_config(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; @@ -265,10 +271,31 @@ async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> { ) .await??; let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; + assert_eq!(thread.thread_source, Some(ThreadSource::User)); let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?; let event = thread_initialized_event(&payload)?; - assert_basic_thread_initialized_event(event, &thread.id, "gpt-5.3-codex", "resumed"); + assert_basic_thread_initialized_event(event, &thread.id, "gpt-5.3-codex", "resumed", "user"); + assert_eq!(event["event_params"]["thread_source"], "user"); + Ok(()) +} + +fn set_thread_source_on_fake_rollout( + codex_home: &std::path::Path, + filename_ts: &str, + thread_id: &str, + thread_source: &str, +) -> Result<()> { + let path = rollout_path(codex_home, filename_ts, thread_id); + let contents = std::fs::read_to_string(&path)?; + let mut lines = contents.lines(); + let session_meta = lines + .next() + .ok_or_else(|| anyhow::anyhow!("fake rollout missing session meta"))?; + let mut session_meta: serde_json::Value = serde_json::from_str(session_meta)?; + session_meta["payload"]["thread_source"] = serde_json::json!(thread_source); + let remaining = lines.collect::>().join("\n"); + std::fs::write(&path, format!("{session_meta}\n{remaining}\n"))?; Ok(()) } @@ -1179,6 +1206,7 @@ stream_max_retries = 0 originator: "codex".to_string(), cli_version: "0.0.0".to_string(), source: RolloutSessionSource::Cli, + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/app-server/tests/suite/v2/thread_start.rs b/codex-rs/app-server/tests/suite/v2/thread_start.rs index 2eba334774..68fc818ec2 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_start.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_start.rs @@ -15,6 +15,7 @@ use codex_app_server_protocol::McpServerStatusUpdatedNotification; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SandboxMode; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ThreadSource; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStartedNotification; @@ -107,6 +108,7 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> { let req_id = mcp .send_thread_start_request(ThreadStartParams { model: Some("gpt-5.2".to_string()), + thread_source: Some(ThreadSource::User), ..Default::default() }) .await?; @@ -138,6 +140,7 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> { "new persistent threads should not be ephemeral" ); assert_eq!(thread.status, ThreadStatus::Idle); + assert_eq!(thread.thread_source, Some(ThreadSource::User)); let thread_path = thread.path.clone().expect("thread path should be present"); assert!(thread_path.is_absolute(), "thread path should be absolute"); assert!( @@ -160,6 +163,11 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> { Some(false), "new persistent threads should serialize `ephemeral: false`" ); + assert_eq!( + thread_json.get("threadSource").and_then(Value::as_str), + Some("user"), + "new threads should serialize the caller-supplied thread origin" + ); assert_eq!(thread.name, None); // A corresponding thread/started notification should arrive. @@ -201,6 +209,13 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> { Some(false), "thread/started should serialize `ephemeral: false` for new persistent threads" ); + assert_eq!( + started_thread_json + .get("threadSource") + .and_then(Value::as_str), + Some("user"), + "thread/started should preserve the caller-supplied thread origin" + ); let started: ThreadStartedNotification = serde_json::from_value(notif.params.expect("params must be present"))?; assert_eq!(started.thread, thread); @@ -312,7 +327,10 @@ async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> { timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let req_id = mcp - .send_thread_start_request(ThreadStartParams::default()) + .send_thread_start_request(ThreadStartParams { + thread_source: Some(ThreadSource::User), + ..Default::default() + }) .await?; let resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, @@ -324,7 +342,7 @@ async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> { let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?; assert_eq!(payload["events"].as_array().expect("events array").len(), 1); let event = thread_initialized_event(&payload)?; - assert_basic_thread_initialized_event(event, &thread.id, "mock-model", "new"); + assert_basic_thread_initialized_event(event, &thread.id, "mock-model", "new", "user"); Ok(()) } diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index 952d6069df..971b081ae5 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -40,6 +40,7 @@ use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ServerRequestResolvedNotification; use codex_app_server_protocol::TextElement; use codex_app_server_protocol::ThreadItem; +use codex_app_server_protocol::ThreadSource; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::TurnCompletedNotification; @@ -116,6 +117,7 @@ async fn turn_start_sends_originator_header() -> Result<()> { let thread_req = mcp .send_thread_start_request(ThreadStartParams { model: Some("mock-model".to_string()), + thread_source: Some(ThreadSource::User), ..Default::default() }) .await?; @@ -183,6 +185,7 @@ async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> { let thread_req = mcp .send_thread_start_request(ThreadStartParams { model: Some("mock-model".to_string()), + thread_source: Some(ThreadSource::User), ..Default::default() }) .await?; @@ -464,6 +467,7 @@ async fn turn_start_tracks_turn_event_analytics() -> Result<()> { let thread_req = mcp .send_thread_start_request(ThreadStartParams { model: Some("mock-model".to_string()), + thread_source: Some(ThreadSource::User), ..Default::default() }) .await?; diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 3c98fd15d7..e5967a6ed2 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -27,6 +27,7 @@ use codex_protocol::protocol::ResumedHistory; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; +use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::user_input::UserInput; use codex_thread_store::ReadThreadParams; @@ -234,6 +235,7 @@ impl AgentControl { config.clone(), self.clone(), session_source, + /*thread_source*/ Some(ThreadSource::Subagent), /*persist_extended_history*/ false, /*metrics_service_name*/ None, inherited_shell_snapshot, @@ -420,6 +422,7 @@ impl AgentControl { InitialHistory::Forked(forked_rollout_items), self.clone(), session_source, + /*thread_source*/ Some(ThreadSource::Subagent), /*persist_extended_history*/ false, inherited_shell_snapshot, inherited_exec_policy, diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 6aae36d2e7..572cffcabe 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -16,6 +16,7 @@ use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::Submission; +use codex_protocol::protocol::ThreadSource; use codex_protocol::request_permissions::PermissionGrantScope; use codex_protocol::request_permissions::RequestPermissionsArgs; use codex_protocol::request_permissions::RequestPermissionsEvent; @@ -84,6 +85,7 @@ pub(crate) async fn run_codex_thread_interactive( skills_watcher: Arc::clone(&parent_session.services.skills_watcher), conversation_history: initial_history.unwrap_or(InitialHistory::New), session_source: SessionSource::SubAgent(subagent_source.clone()), + thread_source: Some(ThreadSource::Subagent), agent_control: parent_session.services.agent_control.clone(), dynamic_tools: Vec::new(), persist_extended_history: false, diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 3d1ce3ea76..2f74bd48b4 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -30,6 +30,7 @@ use codex_protocol::protocol::SessionConfiguredEvent; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::Submission; use codex_protocol::protocol::ThreadMemoryMode; +use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TokenUsageInfo; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::user_input::UserInput; @@ -62,6 +63,7 @@ pub struct ThreadConfigSnapshot { pub reasoning_effort: Option, pub personality: Option, pub session_source: SessionSource, + pub thread_source: Option, } impl ThreadConfigSnapshot { diff --git a/codex-rs/core/src/personality_migration_tests.rs b/codex-rs/core/src/personality_migration_tests.rs index 63faae418a..3de22ba1d3 100644 --- a/codex-rs/core/src/personality_migration_tests.rs +++ b/codex-rs/core/src/personality_migration_tests.rs @@ -73,6 +73,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R originator: "test_originator".to_string(), cli_version: "test_version".to_string(), source: SessionSource::Cli, + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/core/src/realtime_context_tests.rs b/codex-rs/core/src/realtime_context_tests.rs index 9c1eb3af4b..f36239f2ae 100644 --- a/codex-rs/core/src/realtime_context_tests.rs +++ b/codex-rs/core/src/realtime_context_tests.rs @@ -49,6 +49,7 @@ fn stored_thread(cwd: &str, title: &str, first_user_message: &str) -> StoredThre cwd: PathBuf::from(cwd), cli_version: "test".to_string(), source: SessionSource::Cli, + thread_source: None, agent_nickname: None, agent_role: None, agent_path: None, diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 4bb57d3f7d..948678067e 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -110,6 +110,7 @@ use codex_protocol::protocol::ReviewRequest; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; +use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnContextItem; use codex_protocol::protocol::TurnContextNetworkItem; @@ -391,6 +392,7 @@ pub(crate) struct CodexSpawnArgs { pub(crate) skills_watcher: Arc, pub(crate) conversation_history: InitialHistory, pub(crate) session_source: SessionSource, + pub(crate) thread_source: Option, pub(crate) agent_control: AgentControl, pub(crate) dynamic_tools: Vec, pub(crate) persist_extended_history: bool, @@ -453,6 +455,7 @@ impl Codex { skills_watcher, conversation_history, session_source, + thread_source, agent_control, dynamic_tools, persist_extended_history, @@ -612,6 +615,7 @@ impl Codex { app_server_client_name: None, app_server_client_version: None, session_source, + thread_source, dynamic_tools, persist_extended_history, inherited_shell_snapshot, diff --git a/codex-rs/core/src/session/review.rs b/codex-rs/core/src/session/review.rs index 67c1e33310..bd50d74bb6 100644 --- a/codex-rs/core/src/session/review.rs +++ b/codex-rs/core/src/session/review.rs @@ -103,7 +103,7 @@ pub(super) async fn spawn_review_thread( let review_turn_id = sub_id.to_string(); let turn_metadata_state = Arc::new(TurnMetadataState::new( sess.conversation_id.to_string(), - &session_source, + parent_turn_context.thread_source, review_turn_id.clone(), parent_turn_context.cwd.clone(), &parent_turn_context.permission_profile, @@ -123,6 +123,7 @@ pub(super) async fn spawn_review_thread( reasoning_effort, reasoning_summary, session_source, + thread_source: parent_turn_context.thread_source, environments: parent_turn_context.environments.clone(), tools_config, features: parent_turn_context.features.clone(), diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 23621f87cf..d593544b49 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -2,6 +2,7 @@ use super::*; use crate::goals::GoalRuntimeState; use codex_protocol::permissions::FileSystemPath; use codex_protocol::permissions::FileSystemSpecialPath; +use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnEnvironmentSelection; use tokio::sync::Semaphore; @@ -86,6 +87,8 @@ pub(crate) struct SessionConfiguration { pub(super) app_server_client_version: Option, /// Source of the session (cli, vscode, exec, mcp, ...) pub(super) session_source: SessionSource, + /// Optional analytics source classification for this thread. + pub(super) thread_source: Option, pub(super) dynamic_tools: Vec, pub(super) persist_extended_history: bool, pub(super) inherited_shell_snapshot: Option>, @@ -141,6 +144,7 @@ impl SessionConfiguration { reasoning_effort: self.collaboration_mode.reasoning_effort(), personality: self.personality, session_source: self.session_source.clone(), + thread_source: self.thread_source, } } @@ -393,6 +397,7 @@ impl Session { thread_id: conversation_id, forked_from_id, source: session_source, + thread_source: session_configuration.thread_source, base_instructions: BaseInstructions { text: session_configuration.base_instructions.clone(), }, @@ -875,6 +880,7 @@ impl Session { msg: EventMsg::SessionConfigured(SessionConfiguredEvent { session_id: conversation_id, forked_from_id, + thread_source: session_configuration.thread_source, thread_name: session_configuration.thread_name.clone(), model: session_configuration.collaboration_mode.model().to_string(), model_provider_id: config.model_provider_id.clone(), diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index f3a8d95107..3875f8fb57 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -1722,6 +1722,7 @@ async fn fork_startup_context_then_first_turn_diff_snapshot() -> anyhow::Result< usize::MAX, fork_config.clone(), rollout_path, + /*thread_source*/ None, /*persist_extended_history*/ false, /*parent_trace*/ None, ) @@ -2411,6 +2412,7 @@ async fn set_rate_limits_retains_previous_credits() { app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + thread_source: None, dynamic_tools: Vec::new(), persist_extended_history: false, inherited_shell_snapshot: None, @@ -2514,6 +2516,7 @@ async fn set_rate_limits_updates_plan_type_when_present() { app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + thread_source: None, dynamic_tools: Vec::new(), persist_extended_history: false, inherited_shell_snapshot: None, @@ -2766,6 +2769,7 @@ async fn attach_thread_persistence(session: &mut Session) -> PathBuf { thread_id: session.conversation_id, forked_from_id: None, source: SessionSource::Exec, + thread_source: None, base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), metadata: ThreadPersistenceMetadata { @@ -2972,6 +2976,7 @@ pub(crate) async fn make_session_configuration_for_tests() -> SessionConfigurati app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + thread_source: None, dynamic_tools: Vec::new(), persist_extended_history: false, inherited_shell_snapshot: None, @@ -3495,6 +3500,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() { app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + thread_source: None, dynamic_tools: Vec::new(), persist_extended_history: false, inherited_shell_snapshot: None, @@ -3607,6 +3613,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + thread_source: None, dynamic_tools: Vec::new(), persist_extended_history: false, inherited_shell_snapshot: None, @@ -3829,6 +3836,7 @@ async fn make_session_with_config_and_rx( app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + thread_source: None, dynamic_tools: Vec::new(), persist_extended_history: false, inherited_shell_snapshot: None, @@ -4793,6 +4801,7 @@ async fn shutdown_complete_does_not_append_to_thread_store_after_shutdown() { thread_id: session.conversation_id, forked_from_id: None, source: SessionSource::Exec, + thread_source: None, base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), metadata: ThreadPersistenceMetadata { @@ -5116,6 +5125,7 @@ where app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + thread_source: None, dynamic_tools, persist_extended_history: false, inherited_shell_snapshot: None, diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index d7ec5937d2..8f8577e7fe 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -752,6 +752,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { session_source: SessionSource::SubAgent(SubAgentSource::Other( GUARDIAN_REVIEWER_NAME.to_string(), )), + thread_source: None, agent_control: AgentControl::default(), dynamic_tools: Vec::new(), persist_extended_history: false, diff --git a/codex-rs/core/src/session/turn_context.rs b/codex-rs/core/src/session/turn_context.rs index 00ff985861..e42fc2b102 100644 --- a/codex-rs/core/src/session/turn_context.rs +++ b/codex-rs/core/src/session/turn_context.rs @@ -5,6 +5,7 @@ use crate::environment_selection::ResolvedTurnEnvironments; use codex_model_provider::SharedModelProvider; use codex_model_provider::create_model_provider; use codex_protocol::models::AdditionalPermissionProfile; +use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_sandboxing::compatibility_sandbox_policy_for_permission_profile; use codex_sandboxing::policy_transforms::effective_file_system_sandbox_policy; @@ -63,6 +64,7 @@ pub(crate) struct TurnContext { pub(crate) reasoning_effort: Option, pub(crate) reasoning_summary: ReasoningSummaryConfig, pub(crate) session_source: SessionSource, + pub(crate) thread_source: Option, pub(crate) environments: ResolvedTurnEnvironments, /// The session's absolute working directory. All relative paths provided /// by the model as well as sandbox policies are resolved against this path @@ -248,6 +250,7 @@ impl TurnContext { reasoning_effort, reasoning_summary: self.reasoning_summary, session_source: self.session_source.clone(), + thread_source: self.thread_source, environments: self.environments.clone(), cwd: self.cwd.clone(), current_date: self.current_date.clone(), @@ -520,7 +523,7 @@ impl Session { let per_turn_config = Arc::new(per_turn_config); let turn_metadata_state = Arc::new(TurnMetadataState::new( conversation_id.to_string(), - &session_source, + session_configuration.thread_source, sub_id.clone(), cwd.clone(), &session_configuration.permission_profile(), @@ -540,6 +543,7 @@ impl Session { reasoning_effort, reasoning_summary, session_source, + thread_source: session_configuration.thread_source, environments, cwd, current_date: Some(current_date), diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 06f8118db7..03a3ec92fc 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -48,6 +48,7 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionConfiguredEvent; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; +use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnEnvironmentSelection; @@ -219,6 +220,7 @@ pub struct StartThreadOptions { pub config: Config, pub initial_history: InitialHistory, pub session_source: Option, + pub thread_source: Option, pub dynamic_tools: Vec, pub persist_extended_history: bool, pub metrics_service_name: Option, @@ -620,6 +622,7 @@ impl ThreadManager { config, initial_history: InitialHistory::New, session_source: None, + thread_source: None, dynamic_tools, persist_extended_history, metrics_service_name: None, @@ -636,12 +639,16 @@ impl ThreadManager { let session_source = options .session_source .unwrap_or_else(|| self.state.session_source.clone()); + let thread_source = options + .thread_source + .or_else(|| options.initial_history.get_resumed_thread_source()); Box::pin(self.state.spawn_thread_with_source( options.config, options.initial_history, Arc::clone(&self.state.auth_manager), self.agent_control(), session_source, + thread_source, options.dynamic_tools, options.persist_extended_history, options.metrics_service_name, @@ -684,11 +691,13 @@ impl ThreadManager { self.state.environment_manager.as_ref(), &config.cwd, ); + let thread_source = initial_history.get_resumed_thread_source(); Box::pin(self.state.spawn_thread( config, initial_history, auth_manager, self.agent_control(), + thread_source, Vec::new(), persist_extended_history, /*metrics_service_name*/ None, @@ -713,6 +722,7 @@ impl ThreadManager { InitialHistory::New, Arc::clone(&self.state.auth_manager), self.agent_control(), + /*thread_source*/ None, Vec::new(), /*persist_extended_history*/ false, /*metrics_service_name*/ None, @@ -735,11 +745,13 @@ impl ThreadManager { self.state.environment_manager.as_ref(), &config.cwd, ); + let thread_source = initial_history.get_resumed_thread_source(); Box::pin(self.state.spawn_thread( config, initial_history, auth_manager, self.agent_control(), + thread_source, Vec::new(), /*persist_extended_history*/ false, /*metrics_service_name*/ None, @@ -817,6 +829,7 @@ impl ThreadManager { snapshot: S, config: Config, path: PathBuf, + thread_source: Option, persist_extended_history: bool, parent_trace: Option, ) -> CodexResult @@ -829,6 +842,7 @@ impl ThreadManager { snapshot, config, history, + thread_source, persist_extended_history, parent_trace, ) @@ -841,6 +855,7 @@ impl ThreadManager { snapshot: S, config: Config, history: InitialHistory, + thread_source: Option, persist_extended_history: bool, parent_trace: Option, ) -> CodexResult @@ -851,6 +866,7 @@ impl ThreadManager { snapshot.into(), config, history, + thread_source, persist_extended_history, parent_trace, ) @@ -862,6 +878,7 @@ impl ThreadManager { snapshot: ForkSnapshot, config: Config, history: InitialHistory, + thread_source: Option, persist_extended_history: bool, parent_trace: Option, ) -> CodexResult { @@ -876,6 +893,7 @@ impl ThreadManager { history, Arc::clone(&self.state.auth_manager), self.agent_control(), + thread_source, Vec::new(), persist_extended_history, /*metrics_service_name*/ None, @@ -991,6 +1009,7 @@ impl ThreadManagerState { config, agent_control, self.session_source.clone(), + /*thread_source*/ None, /*persist_extended_history*/ false, /*metrics_service_name*/ None, /*inherited_shell_snapshot*/ None, @@ -1006,6 +1025,7 @@ impl ThreadManagerState { config: Config, agent_control: AgentControl, session_source: SessionSource, + thread_source: Option, persist_extended_history: bool, metrics_service_name: Option, inherited_shell_snapshot: Option>, @@ -1021,6 +1041,7 @@ impl ThreadManagerState { Arc::clone(&self.auth_manager), agent_control, session_source, + thread_source, Vec::new(), persist_extended_history, metrics_service_name, @@ -1047,12 +1068,14 @@ impl ThreadManagerState { } = options; let environments = default_thread_environment_selections(self.environment_manager.as_ref(), &config.cwd); + let thread_source = initial_history.get_resumed_thread_source(); Box::pin(self.spawn_thread_with_source( config, initial_history, Arc::clone(&self.auth_manager), agent_control, session_source, + thread_source, Vec::new(), /*persist_extended_history*/ false, /*metrics_service_name*/ None, @@ -1072,6 +1095,7 @@ impl ThreadManagerState { initial_history: InitialHistory, agent_control: AgentControl, session_source: SessionSource, + thread_source: Option, persist_extended_history: bool, inherited_shell_snapshot: Option>, inherited_exec_policy: Option>, @@ -1086,6 +1110,7 @@ impl ThreadManagerState { Arc::clone(&self.auth_manager), agent_control, session_source, + thread_source, Vec::new(), persist_extended_history, /*metrics_service_name*/ None, @@ -1106,6 +1131,7 @@ impl ThreadManagerState { initial_history: InitialHistory, auth_manager: Arc, agent_control: AgentControl, + thread_source: Option, dynamic_tools: Vec, persist_extended_history: bool, metrics_service_name: Option, @@ -1119,6 +1145,7 @@ impl ThreadManagerState { auth_manager, agent_control, self.session_source.clone(), + thread_source, dynamic_tools, persist_extended_history, metrics_service_name, @@ -1139,6 +1166,7 @@ impl ThreadManagerState { auth_manager: Arc, agent_control: AgentControl, session_source: SessionSource, + thread_source: Option, dynamic_tools: Vec, persist_extended_history: bool, metrics_service_name: Option, @@ -1202,6 +1230,7 @@ impl ThreadManagerState { skills_watcher: Arc::clone(&self.skills_watcher), conversation_history: initial_history, session_source, + thread_source, agent_control, dynamic_tools, persist_extended_history, diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 643309aac1..a61a3a3545 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -15,6 +15,7 @@ use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::InternalSessionSource; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; use core_test_support::PathBufExt; @@ -178,6 +179,7 @@ fn fork_thread_accepts_legacy_usize_snapshot_argument() { usize::MAX, config, path, + /*thread_source*/ None, /*persist_extended_history*/ false, /*parent_trace*/ None, ); @@ -333,6 +335,7 @@ async fn start_thread_accepts_explicit_environment_when_default_environment_is_d config: config.clone(), initial_history: InitialHistory::New, session_source: None, + thread_source: None, dynamic_tools: Vec::new(), persist_extended_history: false, metrics_service_name: None, @@ -370,6 +373,7 @@ async fn start_thread_keeps_internal_threads_hidden_from_normal_lookups() { session_source: Some(SessionSource::Internal( InternalSessionSource::MemoryConsolidation, )), + thread_source: None, dynamic_tools: Vec::new(), persist_extended_history: false, metrics_service_name: None, @@ -424,6 +428,7 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() { config: config.clone(), initial_history: InitialHistory::New, session_source: None, + thread_source: None, dynamic_tools: Vec::new(), persist_extended_history: false, metrics_service_name: None, @@ -480,6 +485,7 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() { ForkSnapshot::Interrupted, config, rollout_path, + /*thread_source*/ None, /*persist_extended_history*/ false, /*parent_trace*/ None, ) @@ -620,6 +626,86 @@ async fn resume_stopped_thread_from_rollout_spawns_new_thread() { .expect("shutdown resumed thread"); } +#[tokio::test] +async fn resume_stopped_thread_from_rollout_preserves_thread_source() { + let temp_dir = tempdir().expect("tempdir"); + let mut config = test_config().await; + config.codex_home = temp_dir.path().join("codex-home").abs(); + config.cwd = config.codex_home.abs(); + std::fs::create_dir_all(&config.codex_home).expect("create codex home"); + + let auth_manager = + AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); + let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await; + let manager = ThreadManager::new( + &config, + auth_manager.clone(), + SessionSource::Exec, + Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), + /*analytics_events_client*/ None, + state_db, + thread_store, + agent_graph_store, + ); + + let source = manager + .start_thread_with_options(StartThreadOptions { + config: config.clone(), + initial_history: InitialHistory::New, + session_source: None, + thread_source: Some(ThreadSource::User), + dynamic_tools: Vec::new(), + persist_extended_history: false, + metrics_service_name: None, + parent_trace: None, + environments: Vec::new(), + }) + .await + .expect("start source thread"); + source.thread.ensure_rollout_materialized().await; + source + .thread + .flush_rollout() + .await + .expect("flush source rollout"); + let rollout_path = source + .thread + .rollout_path() + .expect("source rollout path should exist"); + source + .thread + .shutdown_and_wait() + .await + .expect("shutdown source thread before resume"); + let _ = manager.remove_thread(&source.thread_id).await; + + let resumed = manager + .resume_thread_from_rollout( + config, + rollout_path, + auth_manager, + /*parent_trace*/ None, + ) + .await + .expect("resume source thread"); + + assert_eq!( + resumed + .thread + .config_snapshot() + .await + .thread_source + .as_ref(), + Some(&ThreadSource::User) + ); + + resumed + .thread + .shutdown_and_wait() + .await + .expect("shutdown resumed thread"); +} + #[tokio::test] async fn new_uses_active_provider_for_model_refresh() { let server = MockServer::start().await; @@ -891,6 +977,7 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor ForkSnapshot::Interrupted, config.clone(), source_path, + /*thread_source*/ None, /*persist_extended_history*/ false, /*parent_trace*/ None, ) @@ -1007,6 +1094,7 @@ async fn interrupted_fork_snapshot_preserves_explicit_turn_id() { ForkSnapshot::Interrupted, config.clone(), source_path, + /*thread_source*/ None, /*persist_extended_history*/ false, /*parent_trace*/ None, ) @@ -1088,6 +1176,7 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_ ForkSnapshot::Interrupted, config.clone(), source_path, + /*thread_source*/ None, /*persist_extended_history*/ false, /*parent_trace*/ None, ) @@ -1128,6 +1217,7 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_ ForkSnapshot::Interrupted, config.clone(), forked_path, + /*thread_source*/ None, /*persist_extended_history*/ false, /*parent_trace*/ None, ) diff --git a/codex-rs/core/src/turn_metadata.rs b/codex-rs/core/src/turn_metadata.rs index 11da058b53..d4a46944eb 100644 --- a/codex-rs/core/src/turn_metadata.rs +++ b/codex-rs/core/src/turn_metadata.rs @@ -17,7 +17,7 @@ use codex_git_utils::get_head_commit_hash; use codex_protocol::config_types::WindowsSandboxLevel; use codex_protocol::models::PermissionProfile; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; -use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::ThreadSource; use codex_utils_absolute_path::AbsolutePathBuf; const MODEL_KEY: &str = "model"; @@ -69,7 +69,7 @@ pub(crate) struct TurnMetadataBag { #[serde(default, skip_serializing_if = "Option::is_none")] session_id: Option, #[serde(default, skip_serializing_if = "Option::is_none")] - thread_source: Option<&'static str>, + thread_source: Option, #[serde(default, skip_serializing_if = "Option::is_none")] turn_id: Option, #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] @@ -115,7 +115,7 @@ fn merge_turn_metadata( fn build_turn_metadata_bag( session_id: Option, - thread_source: Option<&'static str>, + thread_source: Option, turn_id: Option, sandbox: Option, repo_root: Option, @@ -187,7 +187,7 @@ pub(crate) struct TurnMetadataState { impl TurnMetadataState { pub(crate) fn new( session_id: String, - session_source: &SessionSource, + thread_source: Option, turn_id: String, cwd: AbsolutePathBuf, permission_profile: &PermissionProfile, @@ -205,7 +205,7 @@ impl TurnMetadataState { ); let base_metadata = build_turn_metadata_bag( Some(session_id), - session_source.thread_source_name(), + thread_source, Some(turn_id), sandbox, /*repo_root*/ None, diff --git a/codex-rs/core/src/turn_metadata_tests.rs b/codex-rs/core/src/turn_metadata_tests.rs index d16ed27fb4..d840ade91b 100644 --- a/codex-rs/core/src/turn_metadata_tests.rs +++ b/codex-rs/core/src/turn_metadata_tests.rs @@ -4,8 +4,7 @@ use crate::sandbox_tags::sandbox_tag; use codex_protocol::models::PermissionProfile; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::SandboxPolicy; -use codex_protocol::protocol::SessionSource; -use codex_protocol::protocol::SubAgentSource; +use codex_protocol::protocol::ThreadSource; use core_test_support::PathBufExt; use core_test_support::PathExt; use pretty_assertions::assert_eq; @@ -95,7 +94,7 @@ fn turn_metadata_state_uses_platform_sandbox_tag() { let state = TurnMetadataState::new( "session-a".to_string(), - &SessionSource::Exec, + Some(ThreadSource::User), "turn-a".to_string(), cwd, &permission_profile, @@ -117,15 +116,13 @@ fn turn_metadata_state_uses_platform_sandbox_tag() { } #[test] -fn turn_metadata_state_classifies_subagent_thread_source() { +fn turn_metadata_state_uses_explicit_subagent_thread_source() { let temp_dir = TempDir::new().expect("temp dir"); let cwd = temp_dir.path().abs(); let permission_profile = PermissionProfile::read_only(); - let session_source = SessionSource::SubAgent(SubAgentSource::Review); - let state = TurnMetadataState::new( "session-a".to_string(), - &session_source, + Some(ThreadSource::Subagent), "turn-a".to_string(), cwd, &permission_profile, @@ -148,7 +145,7 @@ fn turn_metadata_state_includes_turn_started_at_unix_ms_after_start() { let state = TurnMetadataState::new( "session-a".to_string(), - &SessionSource::Exec, + Some(ThreadSource::User), "turn-a".to_string(), cwd, &permission_profile, @@ -174,7 +171,7 @@ fn turn_metadata_state_includes_model_and_reasoning_effort_only_in_request_meta( let state = TurnMetadataState::new( "session-a".to_string(), - &SessionSource::Exec, + /*thread_source*/ None, "turn-a".to_string(), cwd, &permission_profile, @@ -218,7 +215,7 @@ fn turn_metadata_state_ignores_client_turn_started_at_unix_ms_before_start() { let state = TurnMetadataState::new( "session-a".to_string(), - &SessionSource::Exec, + Some(ThreadSource::User), "turn-a".to_string(), cwd, &permission_profile, @@ -244,7 +241,7 @@ fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields( let state = TurnMetadataState::new( "session-a".to_string(), - &SessionSource::Exec, + Some(ThreadSource::User), "turn-a".to_string(), cwd, &permission_profile, diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index 56e9893116..893d38ed83 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -452,7 +452,7 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e() initial_parsed .get("thread_source") .and_then(serde_json::Value::as_str), - Some("user") + None ); let git_config_global = cwd.join("empty-git-config"); @@ -565,13 +565,13 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e() first_parsed .get("thread_source") .and_then(serde_json::Value::as_str), - Some("user") + None ); assert_eq!( second_parsed .get("thread_source") .and_then(serde_json::Value::as_str), - Some("user") + None ); assert_eq!( first_turn_id, second_turn_id, diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index 354e9a6a03..2788c8ef21 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -855,6 +855,7 @@ async fn fork_thread( nth_user_message, config.clone(), path, + /*thread_source*/ None, /*persist_extended_history*/ false, /*parent_trace*/ None, )) diff --git a/codex-rs/core/tests/suite/fork_thread.rs b/codex-rs/core/tests/suite/fork_thread.rs index 19ed2a2088..91d321b540 100644 --- a/codex-rs/core/tests/suite/fork_thread.rs +++ b/codex-rs/core/tests/suite/fork_thread.rs @@ -101,6 +101,7 @@ async fn fork_thread_twice_drops_to_first_message() { ForkSnapshot::TruncateBeforeNthUserMessage(1), config_for_fork.clone(), base_path.clone(), + /*thread_source*/ None, /*persist_extended_history*/ false, /*parent_trace*/ None, ) @@ -125,6 +126,7 @@ async fn fork_thread_twice_drops_to_first_message() { ForkSnapshot::TruncateBeforeNthUserMessage(0), config_for_fork.clone(), fork1_path.clone(), + /*thread_source*/ None, /*persist_extended_history*/ false, /*parent_trace*/ None, ) @@ -197,6 +199,7 @@ async fn fork_thread_from_history_does_not_require_source_rollout_path() { history: source_items.clone(), rollout_path: None, }), + /*thread_source*/ None, /*persist_extended_history*/ false, /*parent_trace*/ None, ) diff --git a/codex-rs/core/tests/suite/permissions_messages.rs b/codex-rs/core/tests/suite/permissions_messages.rs index bb93d5cbf8..4d6259a599 100644 --- a/codex-rs/core/tests/suite/permissions_messages.rs +++ b/codex-rs/core/tests/suite/permissions_messages.rs @@ -497,6 +497,7 @@ async fn resume_and_fork_append_permissions_messages() -> Result<()> { ForkSnapshot::Interrupted, fork_config.clone(), rollout_path, + /*thread_source*/ None, /*persist_extended_history*/ false, /*parent_trace*/ None, ) diff --git a/codex-rs/core/tests/suite/personality_migration.rs b/codex-rs/core/tests/suite/personality_migration.rs index 8d33ba1705..25415cd7ee 100644 --- a/codex-rs/core/tests/suite/personality_migration.rs +++ b/codex-rs/core/tests/suite/personality_migration.rs @@ -89,6 +89,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R originator: "test_originator".to_string(), cli_version: "test_version".to_string(), source: SessionSource::Cli, + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, @@ -134,6 +135,7 @@ async fn write_rollout_with_meta_only(dir: &Path, thread_id: ThreadId) -> io::Re originator: "test_originator".to_string(), cli_version: "test_version".to_string(), source: SessionSource::Cli, + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/core/tests/suite/rollout_list_find.rs b/codex-rs/core/tests/suite/rollout_list_find.rs index d0b39a9283..51b3f2db1a 100644 --- a/codex-rs/core/tests/suite/rollout_list_find.rs +++ b/codex-rs/core/tests/suite/rollout_list_find.rs @@ -186,6 +186,7 @@ async fn find_locates_rollout_file_written_by_recorder() -> std::io::Result<()> thread_id, /*forked_from_id*/ None, SessionSource::Exec, + /*thread_source*/ None, BaseInstructions::default(), Vec::new(), EventPersistenceMode::Limited, diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index 8250f5493d..20fa8a8f0e 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -144,6 +144,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { originator: "test".to_string(), cli_version: "test".to_string(), source: SessionSource::default(), + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/core/tests/suite/window_headers.rs b/codex-rs/core/tests/suite/window_headers.rs index de52821839..d0e207d963 100644 --- a/codex-rs/core/tests/suite/window_headers.rs +++ b/codex-rs/core/tests/suite/window_headers.rs @@ -72,6 +72,7 @@ async fn window_id_advances_after_compact_persists_on_resume_and_resets_on_fork( /*snapshot*/ 0usize, resumed.config.clone(), rollout_path, + /*thread_source*/ None, /*persist_extended_history*/ false, /*parent_trace*/ None, ) diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index d61346f1d0..0db63b8034 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -1134,6 +1134,7 @@ fn session_configured_from_thread_response( Ok(SessionConfiguredEvent { session_id, forked_from_id: None, + thread_source: None, thread_name, model, model_provider_id, diff --git a/codex-rs/exec/src/lib_tests.rs b/codex-rs/exec/src/lib_tests.rs index 094da6f936..b35f6e9bfa 100644 --- a/codex-rs/exec/src/lib_tests.rs +++ b/codex-rs/exec/src/lib_tests.rs @@ -255,6 +255,7 @@ fn turn_items_for_thread_returns_matching_turn_items() { cwd: test_path_buf("/tmp/project").abs(), cli_version: "0.0.0-test".to_string(), source: codex_app_server_protocol::SessionSource::Exec, + thread_source: None, agent_nickname: None, agent_role: None, git_info: None, @@ -477,6 +478,7 @@ fn sample_thread_start_response() -> ThreadStartResponse { cwd: test_path_buf("/tmp").abs(), cli_version: "0.0.0".to_string(), source: codex_app_server_protocol::SessionSource::Cli, + thread_source: None, agent_nickname: None, agent_role: None, git_info: None, diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index 4b01ccccd1..e066cb6ddc 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -108,6 +108,7 @@ fn session_configured_produces_thread_started_event() { session_id: ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8") .expect("thread id should parse"), forked_from_id: None, + thread_source: None, thread_name: None, model: "codex-mini-latest".to_string(), model_provider_id: "test-provider".to_string(), diff --git a/codex-rs/mcp-server/src/outgoing_message.rs b/codex-rs/mcp-server/src/outgoing_message.rs index eb66ea0619..1ab6b1a0af 100644 --- a/codex-rs/mcp-server/src/outgoing_message.rs +++ b/codex-rs/mcp-server/src/outgoing_message.rs @@ -298,6 +298,7 @@ mod tests { msg: EventMsg::SessionConfigured(SessionConfiguredEvent { session_id: thread_id, forked_from_id: None, + thread_source: None, thread_name: None, model: "gpt-4o".to_string(), model_provider_id: "test-provider".to_string(), @@ -343,6 +344,7 @@ mod tests { let session_configured_event = SessionConfiguredEvent { session_id: conversation_id, forked_from_id: None, + thread_source: None, thread_name: None, model: "gpt-4o".to_string(), model_provider_id: "test-provider".to_string(), @@ -411,6 +413,7 @@ mod tests { let session_configured_event = SessionConfiguredEvent { session_id: thread_id, forked_from_id: None, + thread_source: None, thread_name: None, model: "gpt-4o".to_string(), model_provider_id: "test-provider".to_string(), diff --git a/codex-rs/memories/write/src/runtime.rs b/codex-rs/memories/write/src/runtime.rs index 737fb67870..5db8761058 100644 --- a/codex-rs/memories/write/src/runtime.rs +++ b/codex-rs/memories/write/src/runtime.rs @@ -24,6 +24,7 @@ use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::InternalSessionSource; use codex_protocol::protocol::Op; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TokenUsage; use codex_protocol::user_input::UserInput; use codex_rollout_trace::InferenceTraceContext; @@ -241,6 +242,7 @@ impl MemoryStartupContext { session_source: Some(SessionSource::Internal( InternalSessionSource::MemoryConsolidation, )), + thread_source: Some(ThreadSource::MemoryConsolidation), dynamic_tools: Vec::new(), persist_extended_history: false, metrics_service_name: None, diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index e8f4a80d6a..09be839f93 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -2503,6 +2503,18 @@ impl InitialHistory { }), } } + + pub fn get_resumed_thread_source(&self) -> Option { + match self { + InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => None, + InitialHistory::Resumed(resumed) => { + resumed.history.iter().find_map(|item| match item { + RolloutItem::SessionMeta(meta_line) => meta_line.meta.thread_source, + _ => None, + }) + } + } + } } fn session_cwd_from_items(items: &[RolloutItem]) -> Option { @@ -2528,6 +2540,44 @@ pub enum SessionSource { Unknown, } +#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "snake_case")] +#[ts(rename_all = "snake_case")] +pub enum ThreadSource { + User, + Subagent, + MemoryConsolidation, +} + +impl ThreadSource { + pub fn as_str(self) -> &'static str { + match self { + ThreadSource::User => "user", + ThreadSource::Subagent => "subagent", + ThreadSource::MemoryConsolidation => "memory_consolidation", + } + } +} + +impl fmt::Display for ThreadSource { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.as_str()) + } +} + +impl FromStr for ThreadSource { + type Err = String; + + fn from_str(value: &str) -> Result { + match value { + "user" => Ok(ThreadSource::User), + "subagent" => Ok(ThreadSource::Subagent), + "memory_consolidation" => Ok(ThreadSource::MemoryConsolidation), + other => Err(format!("unknown thread source: {other}")), + } + } +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema, TS)] #[serde(rename_all = "snake_case")] #[ts(rename_all = "snake_case")] @@ -2588,16 +2638,6 @@ impl SessionSource { }) } - /// Low cardinality thread source label for analytics. - pub fn thread_source_name(&self) -> Option<&'static str> { - match self { - SessionSource::Cli | SessionSource::VSCode | SessionSource::Exec => Some("user"), - SessionSource::Internal(_) => Some("internal"), - SessionSource::SubAgent(_) => Some("subagent"), - SessionSource::Mcp | SessionSource::Custom(_) | SessionSource::Unknown => None, - } - } - pub fn is_internal(&self) -> bool { matches!(self, SessionSource::Internal(_)) } @@ -2698,6 +2738,9 @@ pub struct SessionMeta { pub cli_version: String, #[serde(default)] pub source: SessionSource, + /// Optional analytics source classification for this thread. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub thread_source: Option, /// Optional random unique nickname assigned to an AgentControl-spawned sub-agent. #[serde(skip_serializing_if = "Option::is_none")] pub agent_nickname: Option, @@ -2728,6 +2771,7 @@ impl Default for SessionMeta { originator: String::new(), cli_version: String::new(), source: SessionSource::default(), + thread_source: None, agent_nickname: None, agent_role: None, agent_path: None, @@ -3415,6 +3459,9 @@ pub struct SessionConfiguredEvent { pub session_id: ThreadId, #[serde(skip_serializing_if = "Option::is_none")] pub forked_from_id: Option, + /// Optional analytics source classification for this thread. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub thread_source: Option, /// Optional user-facing thread name (may be unset). #[serde(default, skip_serializing_if = "Option::is_none")] @@ -3486,6 +3533,8 @@ impl<'de> Deserialize<'de> for SessionConfiguredEvent { session_id: ThreadId, forked_from_id: Option, #[serde(default)] + thread_source: Option, + #[serde(default)] thread_name: Option, model: String, model_provider_id: String, @@ -3524,6 +3573,7 @@ impl<'de> Deserialize<'de> for SessionConfiguredEvent { Ok(Self { session_id: wire.session_id, forked_from_id: wire.forked_from_id, + thread_source: wire.thread_source, thread_name: wire.thread_name, model: wire.model, model_provider_id: wire.model_provider_id, @@ -4011,28 +4061,6 @@ mod tests { ); } - #[test] - fn session_source_thread_source_name_classifies_user_and_subagent_sources() { - for (source, expected) in [ - (SessionSource::Cli, Some("user")), - (SessionSource::VSCode, Some("user")), - (SessionSource::Exec, Some("user")), - ( - SessionSource::Internal(InternalSessionSource::MemoryConsolidation), - Some("internal"), - ), - ( - SessionSource::SubAgent(SubAgentSource::Review), - Some("subagent"), - ), - (SessionSource::Mcp, None), - (SessionSource::Custom("atlas".to_string()), None), - (SessionSource::Unknown, None), - ] { - assert_eq!(source.thread_source_name(), expected); - } - } - #[test] fn session_source_restriction_product_defaults_non_subagent_sources_to_codex() { assert_eq!( @@ -5274,6 +5302,7 @@ mod tests { msg: EventMsg::SessionConfigured(SessionConfiguredEvent { session_id: conversation_id, forked_from_id: None, + thread_source: None, thread_name: None, model: "codex-mini-latest".to_string(), model_provider_id: "openai".to_string(), diff --git a/codex-rs/rollout/src/metadata_tests.rs b/codex-rs/rollout/src/metadata_tests.rs index c94cd0be7e..45db758c65 100644 --- a/codex-rs/rollout/src/metadata_tests.rs +++ b/codex-rs/rollout/src/metadata_tests.rs @@ -40,6 +40,7 @@ async fn extract_metadata_from_rollout_uses_session_meta() { originator: "cli".to_string(), cli_version: "0.0.0".to_string(), source: SessionSource::default(), + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, @@ -91,6 +92,7 @@ async fn extract_metadata_from_rollout_returns_latest_memory_mode() { originator: "cli".to_string(), cli_version: "0.0.0".to_string(), source: SessionSource::default(), + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, @@ -350,6 +352,7 @@ fn write_rollout_in_sessions_with_cwd( originator: "cli".to_string(), cli_version: "0.0.0".to_string(), source: SessionSource::default(), + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 512c223263..7a5e28a5a3 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -61,6 +61,7 @@ use codex_protocol::protocol::RolloutLine; use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::ThreadSource; use codex_state::StateRuntime; use codex_state::ThreadMetadataBuilder; use codex_utils_path as path_utils; @@ -88,6 +89,7 @@ pub enum RolloutRecorderParams { conversation_id: ThreadId, forked_from_id: Option, source: SessionSource, + thread_source: Option, base_instructions: BaseInstructions, dynamic_tools: Vec, event_persistence_mode: EventPersistenceMode, @@ -164,6 +166,7 @@ impl RolloutRecorderParams { conversation_id: ThreadId, forked_from_id: Option, source: SessionSource, + thread_source: Option, base_instructions: BaseInstructions, dynamic_tools: Vec, event_persistence_mode: EventPersistenceMode, @@ -172,6 +175,7 @@ impl RolloutRecorderParams { conversation_id, forked_from_id, source, + thread_source, base_instructions, dynamic_tools, event_persistence_mode, @@ -667,6 +671,7 @@ impl RolloutRecorder { conversation_id, forked_from_id, source, + thread_source, base_instructions, dynamic_tools, event_persistence_mode, @@ -695,6 +700,7 @@ impl RolloutRecorder { agent_role: source.get_agent_role(), agent_path: source.get_agent_path().map(Into::into), source, + thread_source, model_provider: Some(config.model_provider_id().to_string()), base_instructions: Some(base_instructions), dynamic_tools: if dynamic_tools.is_empty() { diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index 5711c47bad..35018b657d 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -91,6 +91,7 @@ async fn state_db_init_backfills_before_returning() -> anyhow::Result<()> { originator: "test".to_string(), cli_version: "test".to_string(), source: SessionSource::Cli, + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, @@ -306,6 +307,7 @@ async fn recorder_materializes_on_flush_with_pending_items() -> std::io::Result< thread_id, /*forked_from_id*/ None, SessionSource::Exec, + /*thread_source*/ None, BaseInstructions::default(), Vec::new(), EventPersistenceMode::Limited, @@ -386,6 +388,7 @@ async fn persist_reports_filesystem_error_and_retries_buffered_items() -> std::i thread_id, /*forked_from_id*/ None, SessionSource::Exec, + /*thread_source*/ None, BaseInstructions::default(), Vec::new(), EventPersistenceMode::Limited, @@ -485,6 +488,7 @@ async fn metadata_irrelevant_events_touch_state_db_updated_at() -> std::io::Resu thread_id, /*forked_from_id*/ None, SessionSource::Cli, + /*thread_source*/ None, BaseInstructions::default(), Vec::new(), EventPersistenceMode::Limited, diff --git a/codex-rs/rollout/src/session_index_tests.rs b/codex-rs/rollout/src/session_index_tests.rs index fbef7eb4f9..757b08b4d4 100644 --- a/codex-rs/rollout/src/session_index_tests.rs +++ b/codex-rs/rollout/src/session_index_tests.rs @@ -32,6 +32,7 @@ fn write_rollout_with_metadata(path: &Path, thread_id: ThreadId) -> std::io::Res originator: "test_originator".into(), cli_version: "test_version".into(), source: SessionSource::Cli, + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/rollout/src/tests.rs b/codex-rs/rollout/src/tests.rs index b5c2790dae..fdfed3dadc 100644 --- a/codex-rs/rollout/src/tests.rs +++ b/codex-rs/rollout/src/tests.rs @@ -1214,6 +1214,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> { originator: "test_originator".into(), cli_version: "test_version".into(), source: SessionSource::VSCode, + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/state/migrations/0030_threads_thread_source.sql b/codex-rs/state/migrations/0030_threads_thread_source.sql new file mode 100644 index 0000000000..4f11c9a3f8 --- /dev/null +++ b/codex-rs/state/migrations/0030_threads_thread_source.sql @@ -0,0 +1 @@ +ALTER TABLE threads ADD COLUMN thread_source TEXT; diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index dcb730d6c6..723c5084eb 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -48,6 +48,7 @@ fn apply_session_meta_from_item(metadata: &mut ThreadMetadata, meta_line: &Sessi } metadata.id = meta_line.meta.id; metadata.source = enum_to_string(&meta_line.meta.source); + metadata.thread_source = meta_line.meta.thread_source; metadata.agent_nickname = meta_line.meta.agent_nickname.clone(); metadata.agent_role = meta_line.meta.agent_role.clone(); metadata.agent_path = meta_line.meta.agent_path.clone(); @@ -249,6 +250,7 @@ mod tests { originator: "codex_cli_rs".to_string(), cli_version: "0.0.0".to_string(), source: SessionSource::Cli, + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, @@ -382,6 +384,7 @@ mod tests { originator: "codex_cli_rs".to_string(), cli_version: "0.0.0".to_string(), source: SessionSource::Cli, + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, @@ -408,6 +411,7 @@ mod tests { created_at, updated_at: created_at, source: "cli".to_string(), + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/state/src/model/thread_metadata.rs b/codex-rs/state/src/model/thread_metadata.rs index bddb2fb364..e5e2d1d1f8 100644 --- a/codex-rs/state/src/model/thread_metadata.rs +++ b/codex-rs/state/src/model/thread_metadata.rs @@ -6,6 +6,7 @@ use codex_protocol::openai_models::ReasoningEffort; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::ThreadSource; use sqlx::Row; use sqlx::sqlite::SqliteRow; use std::path::PathBuf; @@ -68,6 +69,8 @@ pub struct ThreadMetadata { pub updated_at: DateTime, /// The session source (stringified enum). pub source: String, + /// Optional analytics source classification for this thread. + pub thread_source: Option, /// Optional random unique nickname assigned to an AgentControl-spawned sub-agent. pub agent_nickname: Option, /// Optional role (agent_role) assigned to an AgentControl-spawned sub-agent. @@ -117,6 +120,8 @@ pub struct ThreadMetadataBuilder { pub updated_at: Option>, /// The session source. pub source: SessionSource, + /// Optional analytics source classification for this thread. + pub thread_source: Option, /// Optional random unique nickname assigned to the session. pub agent_nickname: Option, /// Optional role (agent_role) assigned to the session. @@ -157,6 +162,7 @@ impl ThreadMetadataBuilder { created_at, updated_at: None, source, + thread_source: None, agent_nickname: None, agent_role: None, agent_path: None, @@ -188,6 +194,7 @@ impl ThreadMetadataBuilder { created_at, updated_at, source, + thread_source: self.thread_source, agent_nickname: self.agent_nickname.clone(), agent_role: self.agent_role.clone(), agent_path: self @@ -313,6 +320,7 @@ pub(crate) struct ThreadRow { created_at: i64, updated_at: i64, source: String, + thread_source: Option, agent_nickname: Option, agent_role: Option, agent_path: Option, @@ -340,6 +348,7 @@ impl ThreadRow { created_at: row.try_get("created_at")?, updated_at: row.try_get("updated_at")?, source: row.try_get("source")?, + thread_source: row.try_get("thread_source")?, agent_nickname: row.try_get("agent_nickname")?, agent_role: row.try_get("agent_role")?, agent_path: row.try_get("agent_path")?, @@ -371,6 +380,7 @@ impl TryFrom for ThreadMetadata { created_at, updated_at, source, + thread_source, agent_nickname, agent_role, agent_path, @@ -389,12 +399,17 @@ impl TryFrom for ThreadMetadata { git_branch, git_origin_url, } = row; + let thread_source = thread_source + .map(|thread_source| thread_source.parse()) + .transpose() + .map_err(anyhow::Error::msg)?; Ok(Self { id: ThreadId::try_from(id)?, rollout_path: PathBuf::from(rollout_path), created_at: epoch_millis_to_datetime(created_at)?, updated_at: epoch_millis_to_datetime(updated_at)?, source, + thread_source, agent_nickname, agent_role, agent_path, @@ -480,6 +495,7 @@ mod tests { created_at: 1_700_000_000, updated_at: 1_700_000_100, source: "cli".to_string(), + thread_source: None, agent_nickname: None, agent_role: None, agent_path: None, @@ -508,6 +524,7 @@ mod tests { created_at: DateTime::::from_timestamp(1_700_000_000, 0).expect("timestamp"), updated_at: DateTime::::from_timestamp(1_700_000_100, 0).expect("timestamp"), source: "cli".to_string(), + thread_source: None, agent_nickname: None, agent_role: None, agent_path: None, diff --git a/codex-rs/state/src/runtime/memories.rs b/codex-rs/state/src/runtime/memories.rs index 5b75225b1e..186f2dd341 100644 --- a/codex-rs/state/src/runtime/memories.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -137,6 +137,7 @@ SELECT threads.created_at_ms AS created_at, threads.updated_at_ms AS updated_at, threads.source, + threads.thread_source, threads.agent_path, threads.agent_nickname, threads.agent_role, diff --git a/codex-rs/state/src/runtime/test_support.rs b/codex-rs/state/src/runtime/test_support.rs index 5f07336853..aa1785ba7d 100644 --- a/codex-rs/state/src/runtime/test_support.rs +++ b/codex-rs/state/src/runtime/test_support.rs @@ -48,6 +48,7 @@ pub(super) fn test_thread_metadata( created_at: now, updated_at: now, source: "cli".to_string(), + thread_source: None, agent_nickname: None, agent_role: None, agent_path: None, diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 5188bc3bc0..1795a864d8 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -13,6 +13,7 @@ SELECT threads.created_at_ms AS created_at, threads.updated_at_ms AS updated_at, threads.source, + threads.thread_source, threads.agent_nickname, threads.agent_role, threads.agent_path, @@ -486,6 +487,7 @@ INSERT INTO threads ( created_at_ms, updated_at_ms, source, + thread_source, agent_nickname, agent_role, agent_path, @@ -505,7 +507,7 @@ INSERT INTO threads ( git_branch, git_origin_url, memory_mode -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO NOTHING "#, ) @@ -516,6 +518,11 @@ ON CONFLICT(id) DO NOTHING .bind(datetime_to_epoch_millis(metadata.created_at)) .bind(datetime_to_epoch_millis(updated_at)) .bind(metadata.source.as_str()) + .bind( + metadata + .thread_source + .map(codex_protocol::protocol::ThreadSource::as_str), + ) .bind(metadata.agent_nickname.as_deref()) .bind(metadata.agent_role.as_deref()) .bind(metadata.agent_path.as_deref()) @@ -683,6 +690,7 @@ INSERT INTO threads ( created_at_ms, updated_at_ms, source, + thread_source, agent_nickname, agent_role, agent_path, @@ -702,7 +710,7 @@ INSERT INTO threads ( git_branch, git_origin_url, memory_mode -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET rollout_path = excluded.rollout_path, created_at = excluded.created_at, @@ -710,6 +718,7 @@ ON CONFLICT(id) DO UPDATE SET created_at_ms = excluded.created_at_ms, updated_at_ms = excluded.updated_at_ms, source = excluded.source, + thread_source = excluded.thread_source, agent_nickname = excluded.agent_nickname, agent_role = excluded.agent_role, agent_path = excluded.agent_path, @@ -737,6 +746,11 @@ ON CONFLICT(id) DO UPDATE SET .bind(datetime_to_epoch_millis(metadata.created_at)) .bind(datetime_to_epoch_millis(updated_at)) .bind(metadata.source.as_str()) + .bind( + metadata + .thread_source + .map(codex_protocol::protocol::ThreadSource::as_str), + ) .bind(metadata.agent_nickname.as_deref()) .bind(metadata.agent_role.as_deref()) .bind(metadata.agent_path.as_deref()) @@ -958,6 +972,7 @@ SELECT threads.created_at_ms AS created_at, threads.updated_at_ms AS updated_at, threads.source, + threads.thread_source, threads.agent_nickname, threads.agent_role, threads.agent_path, @@ -1361,6 +1376,7 @@ mod tests { originator: String::new(), cli_version: String::new(), source: SessionSource::Cli, + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, @@ -1419,6 +1435,7 @@ mod tests { originator: String::new(), cli_version: String::new(), source: SessionSource::Cli, + thread_source: None, agent_path: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index c54ecb4af2..fca3d21e62 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -272,6 +272,7 @@ fn stored_thread_from_state( cwd: PathBuf::new(), cli_version: "test".to_string(), source: created.source.clone(), + thread_source: created.thread_source, agent_nickname: None, agent_role: None, agent_path: None, diff --git a/codex-rs/thread-store/src/local/create_thread.rs b/codex-rs/thread-store/src/local/create_thread.rs index cb0f64ea9c..44938e6e8e 100644 --- a/codex-rs/thread-store/src/local/create_thread.rs +++ b/codex-rs/thread-store/src/local/create_thread.rs @@ -34,6 +34,7 @@ pub(super) async fn create_thread( params.thread_id, params.forked_from_id, params.source, + params.thread_source, params.base_instructions, params.dynamic_tools, event_persistence_mode(params.event_persistence_mode), diff --git a/codex-rs/thread-store/src/local/helpers.rs b/codex-rs/thread-store/src/local/helpers.rs index 0cbf94da8c..bb46287123 100644 --- a/codex-rs/thread-store/src/local/helpers.rs +++ b/codex-rs/thread-store/src/local/helpers.rs @@ -130,6 +130,7 @@ pub(super) fn stored_thread_from_rollout_item( cwd: item.cwd.unwrap_or_default(), cli_version: item.cli_version.unwrap_or_default(), source, + thread_source: None, agent_nickname: item.agent_nickname, agent_role: item.agent_role, agent_path: None, diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs index e6ea63bba1..8872442073 100644 --- a/codex-rs/thread-store/src/local/mod.rs +++ b/codex-rs/thread-store/src/local/mod.rs @@ -744,6 +744,7 @@ mod tests { thread_id, forked_from_id: None, source: SessionSource::Exec, + thread_source: None, base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), metadata: thread_metadata(), diff --git a/codex-rs/thread-store/src/local/read_thread.rs b/codex-rs/thread-store/src/local/read_thread.rs index 5bcf411beb..dd6e8494ab 100644 --- a/codex-rs/thread-store/src/local/read_thread.rs +++ b/codex-rs/thread-store/src/local/read_thread.rs @@ -274,10 +274,11 @@ async fn stored_thread_from_sqlite_metadata( .ok() .flatten(), }; - let forked_from_id = read_session_meta_line(metadata.rollout_path.as_path()) + let session_meta = read_session_meta_line(metadata.rollout_path.as_path()) .await .ok() - .and_then(|meta_line| meta_line.meta.forked_from_id); + .map(|meta_line| meta_line.meta); + let forked_from_id = session_meta.as_ref().and_then(|meta| meta.forked_from_id); StoredThread { thread_id: metadata.id, rollout_path: Some(metadata.rollout_path), @@ -297,6 +298,7 @@ async fn stored_thread_from_sqlite_metadata( cwd: metadata.cwd, cli_version: metadata.cli_version, source: parse_session_source(&metadata.source), + thread_source: metadata.thread_source, agent_nickname: metadata.agent_nickname, agent_role: metadata.agent_role, agent_path: metadata.agent_path, @@ -362,6 +364,7 @@ fn stored_thread_from_meta_line( cwd: meta_line.meta.cwd, cli_version: meta_line.meta.cli_version, source: meta_line.meta.source, + thread_source: meta_line.meta.thread_source, agent_nickname: meta_line.meta.agent_nickname, agent_role: meta_line.meta.agent_role, agent_path: meta_line.meta.agent_path, diff --git a/codex-rs/thread-store/src/remote/helpers.rs b/codex-rs/thread-store/src/remote/helpers.rs index 74b3ac7763..3322ecd142 100644 --- a/codex-rs/thread-store/src/remote/helpers.rs +++ b/codex-rs/thread-store/src/remote/helpers.rs @@ -16,6 +16,7 @@ use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::ThreadMemoryMode; +use codex_protocol::protocol::ThreadSource; use super::proto; use crate::GitInfoPatch; @@ -296,6 +297,11 @@ pub(super) fn stored_thread_from_proto( cwd: PathBuf::from(thread.cwd), cli_version: thread.cli_version, source, + thread_source: thread + .thread_source + .map(|thread_source| thread_source.parse::()) + .transpose() + .map_err(|error| ThreadStoreError::Internal { message: error })?, agent_nickname: thread.agent_nickname, agent_role: thread.agent_role, agent_path: thread.agent_path, @@ -340,6 +346,7 @@ pub(super) fn stored_thread_to_proto(thread: StoredThread) -> proto::StoredThrea cwd: thread.cwd.to_string_lossy().into_owned(), cli_version: thread.cli_version, source: Some(proto_session_source(&thread.source)), + thread_source: thread.thread_source.map(|source| source.to_string()), git_info: thread.git_info.map(git_info_to_proto), agent_nickname: thread.agent_nickname, agent_role: thread.agent_role, diff --git a/codex-rs/thread-store/src/remote/list_threads.rs b/codex-rs/thread-store/src/remote/list_threads.rs index 7fb0da6dc3..cf562497f4 100644 --- a/codex-rs/thread-store/src/remote/list_threads.rs +++ b/codex-rs/thread-store/src/remote/list_threads.rs @@ -140,6 +140,7 @@ mod tests { kind: proto::SessionSourceKind::Cli.into(), ..Default::default() }), + thread_source: Some("user".to_string()), git_info: Some(proto::GitInfo { sha: Some("abc123".to_string()), branch: Some("main".to_string()), @@ -250,6 +251,7 @@ mod tests { sub_agent_role: Some("explorer".to_string()), ..Default::default() }), + thread_source: Some("subagent".to_string()), git_info: Some(proto::GitInfo { sha: Some("abc123".to_string()), branch: Some("main".to_string()), diff --git a/codex-rs/thread-store/src/remote/mod.rs b/codex-rs/thread-store/src/remote/mod.rs index 3e74a45f4b..013b74c933 100644 --- a/codex-rs/thread-store/src/remote/mod.rs +++ b/codex-rs/thread-store/src/remote/mod.rs @@ -358,6 +358,7 @@ mod tests { thread_id: ThreadId::new(), forked_from_id: None, source: SessionSource::Exec, + thread_source: None, base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), metadata: metadata.clone(), diff --git a/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.proto b/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.proto index 7c797f139a..a5755afee0 100644 --- a/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.proto +++ b/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.proto @@ -132,6 +132,7 @@ message StoredThread { optional string sandbox_policy_json = 21; optional string token_usage_json = 22; optional StoredThreadHistory history = 23; + optional string thread_source = 24; } message SessionSource { diff --git a/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.rs b/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.rs index a210ef8766..c5b8fdc1d6 100644 --- a/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.rs +++ b/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.rs @@ -168,6 +168,8 @@ pub struct StoredThread { pub token_usage_json: ::core::option::Option<::prost::alloc::string::String>, #[prost(message, optional, tag = "23")] pub history: ::core::option::Option, + #[prost(string, optional, tag = "24")] + pub thread_source: ::core::option::Option<::prost::alloc::string::String>, } #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct SessionSource { diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index 85bde023bd..06aa2998c9 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -12,6 +12,7 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::ThreadMemoryMode as MemoryMode; +use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TokenUsage; use serde::Deserialize; use serde::Serialize; @@ -48,6 +49,8 @@ pub struct CreateThreadParams { pub forked_from_id: Option, /// Runtime source for the thread. pub source: SessionSource, + /// Optional analytics source classification for this thread. + pub thread_source: Option, /// Base instructions persisted in session metadata. pub base_instructions: BaseInstructions, /// Dynamic tools available to the thread at startup. @@ -211,6 +214,8 @@ pub struct StoredThread { pub cli_version: String, /// Runtime source for the thread. pub source: SessionSource, + /// Optional analytics source classification for this thread. + pub thread_source: Option, /// Optional random nickname for thread-spawn sub-agents. pub agent_nickname: Option, /// Optional role for thread-spawn sub-agents. diff --git a/codex-rs/tui/src/app/loaded_threads.rs b/codex-rs/tui/src/app/loaded_threads.rs index c98a54180c..49e5568596 100644 --- a/codex-rs/tui/src/app/loaded_threads.rs +++ b/codex-rs/tui/src/app/loaded_threads.rs @@ -129,6 +129,7 @@ mod tests { cwd: test_path_buf("/tmp").abs(), cli_version: "0.0.0".to_string(), source, + thread_source: None, agent_nickname: None, agent_role: None, git_info: None, diff --git a/codex-rs/tui/src/app/tests.rs b/codex-rs/tui/src/app/tests.rs index 0e59964c3b..bb00b53533 100644 --- a/codex-rs/tui/src/app/tests.rs +++ b/codex-rs/tui/src/app/tests.rs @@ -2871,6 +2871,7 @@ async fn inactive_thread_started_notification_initializes_replay_session() -> Re cwd: test_path_buf("/tmp/agent").abs(), cli_version: "0.0.0".to_string(), source: codex_app_server_protocol::SessionSource::Unknown, + thread_source: None, agent_nickname: Some("Robie".to_string()), agent_role: Some("explorer".to_string()), git_info: None, @@ -2952,6 +2953,7 @@ async fn inactive_thread_started_notification_preserves_primary_model_when_path_ cwd: test_path_buf("/tmp/agent").abs(), cli_version: "0.0.0".to_string(), source: codex_app_server_protocol::SessionSource::Unknown, + thread_source: None, agent_nickname: Some("Robie".to_string()), agent_role: Some("explorer".to_string()), git_info: None, @@ -3006,6 +3008,7 @@ async fn thread_read_session_state_does_not_reuse_primary_permission_profile() { cwd: test_path_buf("/tmp/read").abs(), cli_version: "0.0.0".to_string(), source: codex_app_server_protocol::SessionSource::Unknown, + thread_source: None, agent_nickname: None, agent_role: None, git_info: None, @@ -4969,6 +4972,7 @@ async fn thread_rollback_response_discards_queued_active_thread_events() { cwd: test_path_buf("/tmp/project").abs(), cli_version: "0.0.0".to_string(), source: SessionSource::Cli, + thread_source: None, agent_nickname: None, agent_role: None, git_info: None, diff --git a/codex-rs/tui/src/app/thread_session_state.rs b/codex-rs/tui/src/app/thread_session_state.rs index 3a898b82a3..ac6f6311ca 100644 --- a/codex-rs/tui/src/app/thread_session_state.rs +++ b/codex-rs/tui/src/app/thread_session_state.rs @@ -333,6 +333,7 @@ mod tests { cwd: test_path_buf("/tmp/read").abs(), cli_version: "0.0.0".to_string(), source: codex_app_server_protocol::SessionSource::Unknown, + thread_source: None, agent_nickname: None, agent_role: None, git_info: None, diff --git a/codex-rs/tui/src/app_server_session.rs b/codex-rs/tui/src/app_server_session.rs index 67e4869868..9aafe9aed3 100644 --- a/codex-rs/tui/src/app_server_session.rs +++ b/codex-rs/tui/src/app_server_session.rs @@ -1826,6 +1826,7 @@ mod tests { cwd: test_path_buf("/tmp/project").abs(), cli_version: "0.0.0".to_string(), source: codex_app_server_protocol::SessionSource::Cli, + thread_source: None, agent_nickname: None, agent_role: None, git_info: None, diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index 06ad0a61a7..171a1c3513 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -5695,6 +5695,7 @@ session_picker_view = "dense" cwd: test_path_buf("/tmp").abs(), cli_version: String::from("0.0.0"), source: codex_app_server_protocol::SessionSource::Cli, + thread_source: None, agent_nickname: None, agent_role: None, git_info: None, @@ -5727,6 +5728,7 @@ session_picker_view = "dense" cwd: test_path_buf("/tmp").abs(), cli_version: String::from("0.0.0"), source: codex_app_server_protocol::SessionSource::Cli, + thread_source: None, agent_nickname: None, agent_role: None, git_info: None, @@ -5792,6 +5794,7 @@ session_picker_view = "dense" cwd: test_path_buf("/tmp").abs(), cli_version: String::from("0.0.0"), source: codex_app_server_protocol::SessionSource::Cli, + thread_source: None, agent_nickname: None, agent_role: None, git_info: None, @@ -5847,6 +5850,7 @@ session_picker_view = "dense" cwd: test_path_buf("/tmp").abs(), cli_version: String::from("0.0.0"), source: codex_app_server_protocol::SessionSource::Cli, + thread_source: None, agent_nickname: None, agent_role: None, git_info: None,