event rename and resume support

This commit is contained in:
Roy Han
2026-03-25 10:27:05 -07:00
parent 7eebd0281f
commit 08a8c84453
4 changed files with 194 additions and 101 deletions

View File

@@ -35,7 +35,7 @@ pub(crate) struct TrackEventsContext {
}
#[derive(Clone)]
pub(crate) struct CodexThreadStartedEvent {
pub(crate) struct CodexThreadInitializedEvent {
pub(crate) thread_id: String,
pub(crate) model: String,
pub(crate) model_provider: String,
@@ -136,8 +136,8 @@ impl AnalyticsEventsQueue {
TrackEventsJob::SkillInvocations(job) => {
send_track_skill_invocations(&auth_manager, job).await;
}
TrackEventsJob::ThreadStarted(job) => {
send_track_thread_started(&auth_manager, job).await;
TrackEventsJob::ThreadInitialized(job) => {
send_track_thread_initialized(&auth_manager, job).await;
}
TrackEventsJob::AppMentioned(job) => {
send_track_app_mentioned(&auth_manager, job).await;
@@ -231,8 +231,8 @@ impl AnalyticsEventsClient {
);
}
pub(crate) fn track_thread_started(&self, thread_event: CodexThreadStartedEvent) {
track_thread_started(&self.queue, Arc::clone(&self.config), thread_event);
pub(crate) fn track_thread_initialized(&self, thread_event: CodexThreadInitializedEvent) {
track_thread_initialized(&self.queue, Arc::clone(&self.config), thread_event);
}
pub(crate) fn track_app_mentioned(
@@ -317,7 +317,7 @@ impl AnalyticsEventsClient {
enum TrackEventsJob {
SkillInvocations(TrackSkillInvocationsJob),
ThreadStarted(TrackThreadStartedJob),
ThreadInitialized(TrackThreadInitializedJob),
AppMentioned(TrackAppMentionedJob),
AppUsed(TrackAppUsedJob),
TurnEvent(TrackTurnEventJob),
@@ -334,9 +334,9 @@ struct TrackSkillInvocationsJob {
invocations: Vec<SkillInvocation>,
}
struct TrackThreadStartedJob {
struct TrackThreadInitializedJob {
config: Arc<Config>,
thread_event: CodexThreadStartedEvent,
thread_event: CodexThreadInitializedEvent,
}
struct TrackAppMentionedJob {
@@ -389,7 +389,7 @@ struct TrackEventsRequest {
#[serde(untagged)]
enum TrackEventRequest {
SkillInvocation(SkillInvocationEventRequest),
ThreadStarted(CodexThreadStartedEventRequest),
ThreadInitialized(CodexThreadInitializedEventRequest),
AppMentioned(CodexAppMentionedEventRequest),
AppUsed(CodexAppUsedEventRequest),
TurnEvent(CodexTurnEventRequest),
@@ -419,7 +419,7 @@ struct SkillInvocationEventParams {
}
#[derive(Serialize)]
struct CodexThreadStartedEventParams {
struct CodexThreadInitializedEventParams {
thread_id: String,
product_client_id: String,
model: String,
@@ -442,9 +442,9 @@ struct CodexThreadStartedEventParams {
}
#[derive(Serialize)]
struct CodexThreadStartedEventRequest {
struct CodexThreadInitializedEventRequest {
event_type: &'static str,
event_params: CodexThreadStartedEventParams,
event_params: CodexThreadInitializedEventParams,
}
#[derive(Serialize)]
@@ -552,15 +552,15 @@ pub(crate) fn track_skill_invocations(
queue.try_send(job);
}
pub(crate) fn track_thread_started(
pub(crate) fn track_thread_initialized(
queue: &AnalyticsEventsQueue,
config: Arc<Config>,
thread_event: CodexThreadStartedEvent,
thread_event: CodexThreadInitializedEvent,
) {
if config.analytics_enabled == Some(false) {
return;
}
let job = TrackEventsJob::ThreadStarted(TrackThreadStartedJob {
let job = TrackEventsJob::ThreadInitialized(TrackThreadInitializedJob {
config,
thread_event,
});
@@ -723,15 +723,15 @@ async fn send_track_skill_invocations(auth_manager: &AuthManager, job: TrackSkil
send_track_events(auth_manager, config, events).await;
}
async fn send_track_thread_started(auth_manager: &AuthManager, job: TrackThreadStartedJob) {
let TrackThreadStartedJob {
async fn send_track_thread_initialized(auth_manager: &AuthManager, job: TrackThreadInitializedJob) {
let TrackThreadInitializedJob {
config,
thread_event,
} = job;
let events = vec![TrackEventRequest::ThreadStarted(
CodexThreadStartedEventRequest {
event_type: "codex_thread_started",
event_params: codex_thread_started_event_params(thread_event),
let events = vec![TrackEventRequest::ThreadInitialized(
CodexThreadInitializedEventRequest {
event_type: "codex_thread_initialized",
event_params: codex_thread_initialized_event_params(thread_event),
},
)];
@@ -909,10 +909,10 @@ fn personality_mode(personality: Option<Personality>) -> Option<String> {
}
}
fn codex_thread_started_event_params(
thread_event: CodexThreadStartedEvent,
) -> CodexThreadStartedEventParams {
CodexThreadStartedEventParams {
fn codex_thread_initialized_event_params(
thread_event: CodexThreadInitializedEvent,
) -> CodexThreadInitializedEventParams {
CodexThreadInitializedEventParams {
thread_id: thread_event.thread_id,
product_client_id: crate::default_client::originator().value,
model: thread_event.model,

View File

@@ -4,15 +4,11 @@ use super::CodexAppMentionedEventRequest;
use super::CodexAppUsedEventRequest;
use super::CodexPluginEventRequest;
use super::CodexPluginUsedEventRequest;
use super::CodexThreadStartedEvent;
use super::CodexThreadStartedEventRequest;
<<<<<<< HEAD
use super::CodexThreadInitializedEvent;
use super::CodexThreadInitializedEventRequest;
use super::CodexTurnEvent;
use super::CodexTurnEventRequest;
use super::InitialHistoryType;
=======
use super::InitializationMode;
>>>>>>> 74b8b800c (session source update)
use super::InvocationType;
use super::SubmissionType;
use super::TrackEventRequest;
@@ -20,7 +16,7 @@ use super::TrackEventsContext;
use super::codex_app_metadata;
use super::codex_plugin_metadata;
use super::codex_plugin_used_metadata;
use super::codex_thread_started_event_params;
use super::codex_thread_initialized_event_params;
use super::codex_turn_event_params;
use super::normalize_path_for_skill_id;
use crate::plugins::AppConnectorId;
@@ -266,10 +262,10 @@ fn turn_event_serializes_expected_shape() {
}
#[test]
fn thread_started_event_serializes_expected_shape() {
let event = TrackEventRequest::ThreadStarted(CodexThreadStartedEventRequest {
event_type: "codex_thread_started",
event_params: codex_thread_started_event_params(CodexThreadStartedEvent {
fn thread_initialized_event_serializes_expected_shape() {
let event = TrackEventRequest::ThreadInitialized(CodexThreadInitializedEventRequest {
event_type: "codex_thread_initialized",
event_params: codex_thread_initialized_event_params(CodexThreadInitializedEvent {
thread_id: "thread-0".to_string(),
model: "gpt-5".to_string(),
model_provider: "openai".to_string(),
@@ -291,12 +287,12 @@ fn thread_started_event_serializes_expected_shape() {
}),
});
let payload = serde_json::to_value(&event).expect("serialize thread started event");
let payload = serde_json::to_value(&event).expect("serialize thread initialized event");
assert_eq!(
payload,
json!({
"event_type": "codex_thread_started",
"event_type": "codex_thread_initialized",
"event_params": {
"thread_id": "thread-0",
"product_client_id": crate::default_client::originator().value,
@@ -323,10 +319,10 @@ fn thread_started_event_serializes_expected_shape() {
}
#[test]
fn thread_started_event_serializes_subagent_source() {
let event = TrackEventRequest::ThreadStarted(CodexThreadStartedEventRequest {
event_type: "codex_thread_started",
event_params: codex_thread_started_event_params(CodexThreadStartedEvent {
fn thread_initialized_event_serializes_subagent_source() {
let event = TrackEventRequest::ThreadInitialized(CodexThreadInitializedEventRequest {
event_type: "codex_thread_initialized",
event_params: codex_thread_initialized_event_params(CodexThreadInitializedEvent {
thread_id: "thread-1".to_string(),
model: "gpt-5".to_string(),
model_provider: "openai".to_string(),
@@ -348,16 +344,17 @@ fn thread_started_event_serializes_subagent_source() {
}),
});
let payload = serde_json::to_value(&event).expect("serialize subagent thread started event");
let payload =
serde_json::to_value(&event).expect("serialize subagent thread initialized event");
assert_eq!(payload["event_params"]["session_source"], "subagent");
assert_eq!(payload["event_params"]["subagent_source"], "review");
}
#[test]
fn thread_started_event_omits_non_user_non_subagent_session_source() {
let event = TrackEventRequest::ThreadStarted(CodexThreadStartedEventRequest {
event_type: "codex_thread_started",
event_params: codex_thread_started_event_params(CodexThreadStartedEvent {
fn thread_initialized_event_omits_non_user_non_subagent_session_source() {
let event = TrackEventRequest::ThreadInitialized(CodexThreadInitializedEventRequest {
event_type: "codex_thread_initialized",
event_params: codex_thread_initialized_event_params(CodexThreadInitializedEvent {
thread_id: "thread-2".to_string(),
model: "gpt-5".to_string(),
model_provider: "openai".to_string(),
@@ -379,7 +376,7 @@ fn thread_started_event_omits_non_user_non_subagent_session_source() {
}),
});
let payload = serde_json::to_value(&event).expect("serialize mcp thread started event");
let payload = serde_json::to_value(&event).expect("serialize mcp thread initialized event");
assert_eq!(
payload["event_params"]["session_source"],
serde_json::Value::Null

View File

@@ -16,9 +16,9 @@ use crate::agent::AgentStatus;
use crate::agent::agent_status_from_event;
use crate::analytics_client::AnalyticsEventsClient;
use crate::analytics_client::AppInvocation;
use crate::analytics_client::CodexThreadStartedEvent;
use crate::analytics_client::CodexThreadInitializedEvent;
use crate::analytics_client::CodexTurnEvent;
use crate::analytics_client::InitialHistoryType;
use crate::analytics_client::InitializationMode;
use crate::analytics_client::InvocationType;
use crate::analytics_client::build_track_events_context;
use crate::apps::render_apps_section;
@@ -634,11 +634,9 @@ impl Codex {
user_shell_override,
};
let should_emit_thread_started =
!matches!(conversation_history, InitialHistory::Resumed(_));
let initial_history_type = initial_history_type(&conversation_history);
let initialization_mode = initialization_mode(&conversation_history);
let thread_session_source = session_configuration.session_source.clone();
let thread_started_configuration = session_configuration.clone();
let thread_initialized_configuration = session_configuration.clone();
// Generate a unique ID for the lifetime of this Codex session.
let session_source_clone = session_configuration.session_source.clone();
@@ -667,46 +665,47 @@ impl Codex {
})?;
let thread_id = session.conversation_id;
if should_emit_thread_started {
session
.services
.analytics_events_client
.track_thread_started(CodexThreadStartedEvent {
thread_id: thread_id.to_string(),
model: thread_started_configuration
.collaboration_mode
.model()
.to_string(),
model_provider: thread_started_configuration
.original_config_do_not_use
.model_provider_id
.clone(),
reasoning_effort: thread_started_configuration
.collaboration_mode
.reasoning_effort(),
reasoning_summary: thread_started_configuration.model_reasoning_summary,
service_tier: thread_started_configuration.service_tier,
approval_policy: thread_started_configuration.approval_policy.value(),
approvals_reviewer: thread_started_configuration.approvals_reviewer,
sandbox_policy: thread_started_configuration.sandbox_policy.get().clone(),
sandbox_network_access: thread_started_configuration
.network_sandbox_policy
.is_enabled(),
collaboration_mode: thread_started_configuration.collaboration_mode.mode,
personality: thread_started_configuration.personality,
ephemeral: thread_started_configuration
.original_config_do_not_use
.ephemeral,
initial_history_type,
subagent_source: session_source_subagent_source(&thread_session_source),
parent_thread_id: session_source_parent_thread_id(&thread_session_source),
session_source: thread_session_source,
created_at: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
});
}
session
.services
.analytics_events_client
.track_thread_initialized(CodexThreadInitializedEvent {
thread_id: thread_id.to_string(),
model: thread_initialized_configuration
.collaboration_mode
.model()
.to_string(),
model_provider: thread_initialized_configuration
.original_config_do_not_use
.model_provider_id
.clone(),
reasoning_effort: thread_initialized_configuration
.collaboration_mode
.reasoning_effort(),
reasoning_summary: thread_initialized_configuration.model_reasoning_summary,
service_tier: thread_initialized_configuration.service_tier,
approval_policy: thread_initialized_configuration.approval_policy.value(),
approvals_reviewer: thread_initialized_configuration.approvals_reviewer,
sandbox_policy: thread_initialized_configuration
.sandbox_policy
.get()
.clone(),
sandbox_network_access: thread_initialized_configuration
.network_sandbox_policy
.is_enabled(),
collaboration_mode: thread_initialized_configuration.collaboration_mode.mode,
personality: thread_initialized_configuration.personality,
ephemeral: thread_initialized_configuration
.original_config_do_not_use
.ephemeral,
initialization_mode,
subagent_source: session_source_subagent_source(&thread_session_source),
parent_thread_id: session_source_parent_thread_id(&thread_session_source),
session_source: thread_session_source,
created_at: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
});
// This task will run until Op::Shutdown is received.
let session_for_loop = Arc::clone(&session);
@@ -822,11 +821,11 @@ impl Codex {
}
}
fn initial_history_type(conversation_history: &InitialHistory) -> InitialHistoryType {
fn initialization_mode(conversation_history: &InitialHistory) -> InitializationMode {
match conversation_history {
InitialHistory::New => InitialHistoryType::New,
InitialHistory::Forked(_) => InitialHistoryType::Forked,
InitialHistory::Resumed(_) => InitialHistoryType::Resumed,
InitialHistory::New => InitializationMode::New,
InitialHistory::Forked(_) => InitializationMode::Forked,
InitialHistory::Resumed(_) => InitializationMode::Resumed,
}
}

View File

@@ -1,6 +1,7 @@
use anyhow::Result;
use codex_core::CodexAuth;
use codex_core::config::Constrained;
use codex_core::find_thread_path_by_id_str;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary;
@@ -12,7 +13,7 @@ use std::time::Duration;
use std::time::Instant;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn thread_start_tracks_thread_started_analytics() -> Result<()> {
async fn thread_initialization_tracks_thread_initialized_analytics() -> Result<()> {
let server = start_mock_server().await;
let chatgpt_base_url = server.uri();
@@ -55,8 +56,8 @@ async fn thread_start_tracks_thread_started_analytics() -> Result<()> {
.as_array()
.expect("events array")
.iter()
.find(|event| event["event_type"] == "codex_thread_started")
.expect("codex_thread_started event should be present");
.find(|event| event["event_type"] == "codex_thread_initialized")
.expect("codex_thread_initialized event should be present");
assert_eq!(
event["event_params"]["thread_id"],
@@ -99,3 +100,99 @@ async fn thread_start_tracks_thread_started_analytics() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn resumed_thread_emits_thread_initialized_analytics() -> Result<()> {
let server = start_mock_server().await;
let chatgpt_base_url = server.uri();
let initial = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config({
let chatgpt_base_url = chatgpt_base_url.clone();
move |config| {
config.chatgpt_base_url = chatgpt_base_url;
config.model = Some("gpt-5".to_string());
config.model_reasoning_effort = Some(ReasoningEffort::High);
config.model_reasoning_summary = Some(ReasoningSummary::Detailed);
config.service_tier = Some(ServiceTier::Flex);
config.approvals_reviewer = ApprovalsReviewer::GuardianSubagent;
config.permissions.sandbox_policy = Constrained::allow_any(
codex_protocol::protocol::SandboxPolicy::new_workspace_write_policy(),
);
config.personality = Some(Personality::Friendly);
config.ephemeral = true;
}
})
.build(&server)
.await?;
let rollout_path = find_thread_path_by_id_str(
initial.codex_home_path(),
&initial.session_configured.session_id.to_string(),
)
.await?
.expect("rollout path for initial thread");
let _resumed = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(move |config| {
config.chatgpt_base_url = chatgpt_base_url;
config.model = Some("gpt-5".to_string());
config.model_reasoning_effort = Some(ReasoningEffort::High);
config.model_reasoning_summary = Some(ReasoningSummary::Detailed);
config.service_tier = Some(ServiceTier::Flex);
config.approvals_reviewer = ApprovalsReviewer::GuardianSubagent;
config.permissions.sandbox_policy = Constrained::allow_any(
codex_protocol::protocol::SandboxPolicy::new_workspace_write_policy(),
);
config.personality = Some(Personality::Friendly);
config.ephemeral = true;
})
.resume(&server, initial.home.clone(), rollout_path)
.await?;
let deadline = Instant::now() + Duration::from_secs(10);
let analytics_request = loop {
let requests = server.received_requests().await.unwrap_or_default();
if let Some(request) = requests.into_iter().find(|request| {
if request.url.path() != "/codex/analytics-events/events" {
return false;
}
let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&request.body) else {
return false;
};
payload["events"]
.as_array()
.into_iter()
.flatten()
.any(|event| {
event["event_type"] == "codex_thread_initialized"
&& event["event_params"]["initialization_mode"] == "resumed"
})
}) {
break request;
}
if Instant::now() >= deadline {
panic!("timed out waiting for resumed thread analytics request");
}
tokio::time::sleep(Duration::from_millis(50)).await;
};
let payload: serde_json::Value =
serde_json::from_slice(&analytics_request.body).expect("analytics payload");
let event = payload["events"]
.as_array()
.expect("events array")
.iter()
.find(|event| {
event["event_type"] == "codex_thread_initialized"
&& event["event_params"]["initialization_mode"] == "resumed"
})
.expect("codex_thread_initialized resumed event should be present");
assert_eq!(event["event_params"]["session_source"], "user");
assert_eq!(event["event_params"]["initialization_mode"], "resumed");
Ok(())
}