diff --git a/codex-rs/analytics/src/accepted_line_events.rs b/codex-rs/analytics/src/accepted_line_events.rs new file mode 100644 index 0000000000..9e9bcb2419 --- /dev/null +++ b/codex-rs/analytics/src/accepted_line_events.rs @@ -0,0 +1,112 @@ +use crate::accepted_lines::fingerprint_hash; +use crate::events::CodexAcceptedLineFingerprintsEventParams; +use crate::events::CodexAcceptedLineFingerprintsEventRequest; +use crate::events::TrackEventRequest; +use crate::facts::AcceptedLineFingerprint; +use codex_git_utils::canonicalize_git_remote_url; +use codex_git_utils::get_git_remote_urls_assume_git_repo; +use std::path::Path; + +const ACCEPTED_LINE_FINGERPRINT_EVENT_TARGET_BYTES: usize = 2 * 1024 * 1024; +const ACCEPTED_LINE_FINGERPRINT_EVENT_FIXED_BYTES: usize = 1024; + +pub(crate) struct AcceptedLineFingerprintEventInput { + pub(crate) event_type: &'static str, + pub(crate) turn_id: String, + pub(crate) thread_id: String, + pub(crate) product_surface: Option, + pub(crate) model_slug: Option, + pub(crate) completed_at: u64, + pub(crate) repo_hash: Option, + pub(crate) accepted_added_lines: u64, + pub(crate) accepted_deleted_lines: u64, + pub(crate) line_fingerprints: Vec, +} + +pub(crate) fn accepted_line_fingerprint_event_requests( + input: AcceptedLineFingerprintEventInput, +) -> Vec { + let chunks = accepted_line_fingerprint_chunks(input.line_fingerprints); + chunks + .into_iter() + .enumerate() + .map(|(index, line_fingerprints)| { + let is_first_chunk = index == 0; + TrackEventRequest::AcceptedLineFingerprints(Box::new( + CodexAcceptedLineFingerprintsEventRequest { + event_type: "codex_accepted_line_fingerprints", + event_params: CodexAcceptedLineFingerprintsEventParams { + event_type: input.event_type, + turn_id: input.turn_id.clone(), + thread_id: input.thread_id.clone(), + product_surface: input.product_surface.clone(), + model_slug: input.model_slug.clone(), + completed_at: input.completed_at, + repo_hash: input.repo_hash.clone(), + accepted_added_lines: if is_first_chunk { + input.accepted_added_lines + } else { + 0 + }, + accepted_deleted_lines: if is_first_chunk { + input.accepted_deleted_lines + } else { + 0 + }, + line_fingerprints, + }, + }, + )) + }) + .collect() +} + +pub async fn accepted_line_repo_hash_for_cwd(cwd: &Path) -> Option { + let remotes = get_git_remote_urls_assume_git_repo(cwd).await?; + remotes + .get("origin") + .or_else(|| remotes.values().next()) + .map(|remote_url| { + let canonical_remote_url = + canonicalize_git_remote_url(remote_url).unwrap_or_else(|| remote_url.to_string()); + fingerprint_hash("repo", &canonical_remote_url) + }) +} + +fn accepted_line_fingerprint_chunks( + line_fingerprints: Vec, +) -> Vec> { + if line_fingerprints.is_empty() { + return vec![Vec::new()]; + } + + let mut chunks = Vec::new(); + let mut current = Vec::new(); + let mut current_bytes = ACCEPTED_LINE_FINGERPRINT_EVENT_FIXED_BYTES; + + for fingerprint in line_fingerprints { + let item_bytes = accepted_line_fingerprint_json_bytes(&fingerprint); + let separator_bytes = usize::from(!current.is_empty()); + if !current.is_empty() + && current_bytes + separator_bytes + item_bytes + > ACCEPTED_LINE_FINGERPRINT_EVENT_TARGET_BYTES + { + chunks.push(current); + current = Vec::new(); + current_bytes = ACCEPTED_LINE_FINGERPRINT_EVENT_FIXED_BYTES; + } + current_bytes += usize::from(!current.is_empty()) + item_bytes; + current.push(fingerprint); + } + + if !current.is_empty() { + chunks.push(current); + } + chunks +} + +fn accepted_line_fingerprint_json_bytes(fingerprint: &AcceptedLineFingerprint) -> usize { + // {"path_hash":"...","line_hash":"..."} plus one byte of array comma + // accounted for by the caller when needed. + 32 + fingerprint.path_hash.len() + fingerprint.line_hash.len() +} diff --git a/codex-rs/analytics/src/accepted_lines.rs b/codex-rs/analytics/src/accepted_lines.rs new file mode 100644 index 0000000000..c7fdb6f3c7 --- /dev/null +++ b/codex-rs/analytics/src/accepted_lines.rs @@ -0,0 +1,188 @@ +use crate::facts::AcceptedLineFingerprint; +use sha1::Digest; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct AcceptedLineFingerprintSummary { + pub accepted_added_lines: u64, + pub accepted_deleted_lines: u64, + pub line_fingerprints: Vec, +} + +pub fn accepted_line_fingerprints_from_unified_diff( + unified_diff: &str, +) -> AcceptedLineFingerprintSummary { + let mut current_path: Option = None; + let mut in_hunk = false; + let mut accepted_added_lines = 0; + let mut accepted_deleted_lines = 0; + let mut line_fingerprints = Vec::new(); + + for line in unified_diff.lines() { + if line.starts_with("diff --git ") { + current_path = None; + in_hunk = false; + continue; + } + + if line.starts_with("@@ ") { + in_hunk = true; + continue; + } + + if !in_hunk && let Some(path) = line.strip_prefix("+++ ") { + current_path = normalize_diff_path(path); + continue; + } + + if !in_hunk && line.starts_with("--- ") { + continue; + } + + if let Some(added_line) = line.strip_prefix('+') { + accepted_added_lines += 1; + if let Some(path) = current_path.as_deref() + && let Some(normalized_line) = normalize_effective_line(added_line) + { + line_fingerprints.push(AcceptedLineFingerprint { + path_hash: fingerprint_hash("path", path), + line_hash: fingerprint_hash("line", &normalized_line), + }); + } + continue; + } + + if line.starts_with('-') { + accepted_deleted_lines += 1; + } + } + + AcceptedLineFingerprintSummary { + accepted_added_lines, + accepted_deleted_lines, + line_fingerprints, + } +} + +pub fn fingerprint_hash(domain: &str, value: &str) -> String { + let mut hasher = sha1::Sha1::new(); + hasher.update(b"file-line-v1\0"); + hasher.update(domain.as_bytes()); + hasher.update(b"\0"); + hasher.update(value.as_bytes()); + format!("{:x}", hasher.finalize()) +} + +fn normalize_diff_path(path: &str) -> Option { + let path = path.trim(); + if path == "/dev/null" { + return None; + } + + Some( + path.strip_prefix("b/") + .or_else(|| path.strip_prefix("a/")) + .unwrap_or(path) + .to_string(), + ) +} + +fn normalize_effective_line(line: &str) -> Option { + let normalized = line.split_whitespace().collect::>().join(" "); + if normalized.len() <= 3 { + return None; + } + if !normalized + .chars() + .any(|ch| ch.is_alphanumeric() || ch == '_') + { + return None; + } + Some(normalized) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_counts_and_effective_added_fingerprints() { + let diff = "\ +diff --git a/src/lib.rs b/src/lib.rs +index 1111111..2222222 +--- a/src/lib.rs ++++ b/src/lib.rs +@@ -1,3 +1,5 @@ +-old line ++fn useful() { ++} ++ return user.id; + context +"; + + let summary = accepted_line_fingerprints_from_unified_diff(diff); + + assert_eq!( + summary, + AcceptedLineFingerprintSummary { + accepted_added_lines: 3, + accepted_deleted_lines: 1, + line_fingerprints: vec![ + AcceptedLineFingerprint { + path_hash: fingerprint_hash("path", "src/lib.rs"), + line_hash: fingerprint_hash("line", "fn useful() {"), + }, + AcceptedLineFingerprint { + path_hash: fingerprint_hash("path", "src/lib.rs"), + line_hash: fingerprint_hash("line", "return user.id;"), + }, + ], + } + ); + } + + #[test] + fn skips_added_file_metadata_headers() { + let diff = "\ +diff --git a/new.py b/new.py +new file mode 100644 +index 0000000..1111111 +--- /dev/null ++++ b/new.py +@@ -0,0 +1 @@ ++print('hello') +"; + + let summary = accepted_line_fingerprints_from_unified_diff(diff); + + assert_eq!(summary.accepted_added_lines, 1); + assert_eq!(summary.accepted_deleted_lines, 0); + assert_eq!(summary.line_fingerprints.len(), 1); + } + + #[test] + fn parses_hunk_lines_that_look_like_file_headers() { + let diff = "\ +diff --git a/src/lib.rs b/src/lib.rs +index 1111111..2222222 +--- a/src/lib.rs ++++ b/src/lib.rs +@@ -1,2 +1,2 @@ +--- old value ++++ new value +"; + + let summary = accepted_line_fingerprints_from_unified_diff(diff); + + assert_eq!( + summary, + AcceptedLineFingerprintSummary { + accepted_added_lines: 1, + accepted_deleted_lines: 1, + line_fingerprints: vec![AcceptedLineFingerprint { + path_hash: fingerprint_hash("path", "src/lib.rs"), + line_hash: fingerprint_hash("line", "++ new value"), + }], + } + ); + } +} diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 880adfc254..462b89c64a 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -1,5 +1,7 @@ use crate::client::AnalyticsEventsQueue; use crate::events::AppServerRpcTransport; +use crate::events::CodexAcceptedLineFingerprintsEventParams; +use crate::events::CodexAcceptedLineFingerprintsEventRequest; use crate::events::CodexAppMentionedEventRequest; use crate::events::CodexAppServerClientMetadata; use crate::events::CodexAppUsedEventRequest; @@ -28,6 +30,7 @@ use crate::events::codex_hook_run_metadata; use crate::events::codex_plugin_metadata; use crate::events::codex_plugin_used_metadata; use crate::events::subagent_thread_started_event_request; +use crate::facts::AcceptedLineFingerprint; use crate::facts::AnalyticsFact; use crate::facts::AnalyticsJsonRpcError; use crate::facts::AppInvocation; @@ -89,6 +92,7 @@ use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus; use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnCompletedNotification; +use codex_app_server_protocol::TurnDiffUpdatedNotification; use codex_app_server_protocol::TurnError as AppServerTurnError; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartedNotification; @@ -827,6 +831,206 @@ fn app_used_event_serializes_expected_shape() { ); } +#[test] +fn accepted_line_fingerprints_event_serializes_expected_shape() { + let event = TrackEventRequest::AcceptedLineFingerprints(Box::new( + CodexAcceptedLineFingerprintsEventRequest { + event_type: "codex_accepted_line_fingerprints", + event_params: CodexAcceptedLineFingerprintsEventParams { + event_type: "codex.accepted_line_fingerprints", + turn_id: "turn-1".to_string(), + thread_id: "thread-1".to_string(), + product_surface: Some("codex".to_string()), + model_slug: Some("gpt-5.1-codex".to_string()), + completed_at: 1710000000, + repo_hash: Some("repo-hash-1".to_string()), + accepted_added_lines: 42, + accepted_deleted_lines: 40, + line_fingerprints: vec![AcceptedLineFingerprint { + path_hash: "path-hash-1".to_string(), + line_hash: "line-hash-1".to_string(), + }], + }, + }, + )); + + let payload = serde_json::to_value(&event).expect("serialize accepted line fingerprints event"); + + assert_eq!( + payload, + json!({ + "event_type": "codex_accepted_line_fingerprints", + "event_params": { + "event_type": "codex.accepted_line_fingerprints", + "turn_id": "turn-1", + "thread_id": "thread-1", + "product_surface": "codex", + "model_slug": "gpt-5.1-codex", + "completed_at": 1710000000, + "repo_hash": "repo-hash-1", + "accepted_added_lines": 42, + "accepted_deleted_lines": 40, + "line_fingerprints": [ + { + "path_hash": "path-hash-1", + "line_hash": "line-hash-1" + } + ] + } + }) + ); +} + +#[tokio::test] +async fn reducer_chunks_large_accepted_line_fingerprint_events_without_repeating_counts() { + let mut reducer = AnalyticsReducer::default(); + let mut events = Vec::new(); + + ingest_turn_prerequisites( + &mut reducer, + &mut events, + /*include_initialize*/ true, + /*include_resolved_config*/ true, + /*include_started*/ true, + /*include_token_usage*/ true, + ) + .await; + events.clear(); + + let mut diff = "\ +diff --git a/src/lib.rs b/src/lib.rs +index 1111111..2222222 +--- a/src/lib.rs ++++ b/src/lib.rs +@@ -0,0 +1,20000 @@ +" + .to_string(); + for index in 0..20_000 { + diff.push_str(&format!("+let value_{index} = {index};\n")); + } + + reducer + .ingest( + AnalyticsFact::Notification(Box::new(ServerNotification::TurnDiffUpdated( + TurnDiffUpdatedNotification { + thread_id: "thread-2".to_string(), + turn_id: "turn-2".to_string(), + diff, + }, + ))), + &mut events, + ) + .await; + assert!(events.is_empty()); + + reducer + .ingest( + AnalyticsFact::Notification(Box::new(sample_turn_completed_notification( + "thread-2", + "turn-2", + AppServerTurnStatus::Completed, + /*codex_error_info*/ None, + ))), + &mut events, + ) + .await; + + let accepted_line_events = events + .iter() + .filter_map(|event| match event { + TrackEventRequest::AcceptedLineFingerprints(event) => Some(event), + _ => None, + }) + .collect::>(); + assert!(accepted_line_events.len() > 1); + let mut total_fingerprints = 0; + for (index, event) in accepted_line_events.iter().enumerate() { + assert_eq!(event.event_params.turn_id, "turn-2"); + assert_eq!(event.event_params.thread_id, "thread-2"); + total_fingerprints += event.event_params.line_fingerprints.len(); + if index == 0 { + assert_eq!(event.event_params.accepted_added_lines, 20_000); + assert_eq!(event.event_params.accepted_deleted_lines, 0); + } else { + assert_eq!(event.event_params.accepted_added_lines, 0); + assert_eq!(event.event_params.accepted_deleted_lines, 0); + } + assert!(serde_json::to_vec(event).expect("serialize chunk").len() < 2_100_000); + } + assert_eq!(total_fingerprints, 20_000); +} + +#[tokio::test] +async fn reducer_emits_accepted_line_fingerprints_once_from_latest_turn_diff_on_completion() { + let mut reducer = AnalyticsReducer::default(); + let mut events = Vec::new(); + + ingest_turn_prerequisites( + &mut reducer, + &mut events, + /*include_initialize*/ true, + /*include_resolved_config*/ true, + /*include_started*/ true, + /*include_token_usage*/ true, + ) + .await; + events.clear(); + + for line in ["let old_value = 1;", "let latest_value = 2;"] { + let diff = format!( + "\ +diff --git a/src/lib.rs b/src/lib.rs +index 1111111..2222222 +--- a/src/lib.rs ++++ b/src/lib.rs +@@ -0,0 +1 @@ ++{line} +" + ); + reducer + .ingest( + AnalyticsFact::Notification(Box::new(ServerNotification::TurnDiffUpdated( + TurnDiffUpdatedNotification { + thread_id: "thread-2".to_string(), + turn_id: "turn-2".to_string(), + diff, + }, + ))), + &mut events, + ) + .await; + } + assert!(events.is_empty()); + + reducer + .ingest( + AnalyticsFact::Notification(Box::new(sample_turn_completed_notification( + "thread-2", + "turn-2", + AppServerTurnStatus::Completed, + /*codex_error_info*/ None, + ))), + &mut events, + ) + .await; + + let accepted_line_events = events + .iter() + .filter_map(|event| match event { + TrackEventRequest::AcceptedLineFingerprints(event) => Some(event), + _ => None, + }) + .collect::>(); + assert_eq!(accepted_line_events.len(), 1); + let event = accepted_line_events[0]; + assert_eq!(event.event_params.accepted_added_lines, 1); + assert_eq!(event.event_params.line_fingerprints.len(), 1); + assert_eq!( + event.event_params.line_fingerprints[0].line_hash, + crate::fingerprint_hash("line", "let latest_value = 2;") + ); +} + #[test] fn compaction_event_serializes_expected_shape() { let event = TrackEventRequest::Compaction(Box::new(CodexCompactionEventRequest { diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index 6d6d446560..1f2a7015c6 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -30,6 +30,7 @@ use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ServerResponse; use codex_login::AuthManager; +use codex_login::CodexAuth; use codex_login::default_client::create_client; use codex_plugin::PluginTelemetryMetadata; use std::collections::HashSet; @@ -351,6 +352,7 @@ impl AnalyticsEventsClient { notification, ServerNotification::TurnStarted(_) | ServerNotification::TurnCompleted(_) + | ServerNotification::TurnDiffUpdated(_) | ServerNotification::ItemStarted(_) | ServerNotification::ItemCompleted(_) | ServerNotification::ItemGuardianApprovalReviewStarted(_) @@ -370,6 +372,7 @@ async fn send_track_events( if events.is_empty() { return; } + let Some(auth) = auth_manager.auth().await else { return; }; @@ -379,12 +382,45 @@ async fn send_track_events( let base_url = base_url.trim_end_matches('/'); let url = format!("{base_url}/codex/analytics-events/events"); + for events in track_event_request_batches(events) { + send_track_events_request(&auth, &url, events).await; + } +} + +fn track_event_request_batches(events: Vec) -> Vec> { + let mut batches = Vec::new(); + let mut current_batch = Vec::new(); + + for event in events { + if event.should_send_in_isolated_request() { + if !current_batch.is_empty() { + batches.push(current_batch); + current_batch = Vec::new(); + } + batches.push(vec![event]); + } else { + current_batch.push(event); + } + } + + if !current_batch.is_empty() { + batches.push(current_batch); + } + + batches +} + +async fn send_track_events_request(auth: &CodexAuth, url: &str, events: Vec) { + if events.is_empty() { + return; + } + let payload = TrackEventsRequest { events }; let response = create_client() - .post(&url) + .post(url) .timeout(ANALYTICS_EVENTS_TIMEOUT) - .headers(codex_model_provider::auth_provider_from_auth(&auth).to_auth_headers()) + .headers(codex_model_provider::auth_provider_from_auth(auth).to_auth_headers()) .header("Content-Type", "application/json") .json(&payload) .send() diff --git a/codex-rs/analytics/src/client_tests.rs b/codex-rs/analytics/src/client_tests.rs index 3021d558d6..2ddc21e477 100644 --- a/codex-rs/analytics/src/client_tests.rs +++ b/codex-rs/analytics/src/client_tests.rs @@ -1,6 +1,14 @@ use super::AnalyticsEventsClient; use super::AnalyticsEventsQueue; +use super::track_event_request_batches; +use crate::events::CodexAcceptedLineFingerprintsEventParams; +use crate::events::CodexAcceptedLineFingerprintsEventRequest; +use crate::events::SkillInvocationEventParams; +use crate::events::SkillInvocationEventRequest; +use crate::events::TrackEventRequest; +use crate::facts::AcceptedLineFingerprint; use crate::facts::AnalyticsFact; +use crate::facts::InvocationType; use codex_app_server_protocol::ApprovalsReviewer as AppServerApprovalsReviewer; use codex_app_server_protocol::AskForApproval as AppServerAskForApproval; use codex_app_server_protocol::ClientRequest; @@ -31,6 +39,47 @@ use std::sync::Mutex; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TryRecvError; +fn sample_accepted_line_fingerprint_event(thread_id: &str) -> TrackEventRequest { + TrackEventRequest::AcceptedLineFingerprints(Box::new( + CodexAcceptedLineFingerprintsEventRequest { + event_type: "codex_accepted_line_fingerprints", + event_params: CodexAcceptedLineFingerprintsEventParams { + event_type: "codex.accepted_line_fingerprints", + turn_id: "turn-1".to_string(), + thread_id: thread_id.to_string(), + product_surface: Some("codex".to_string()), + model_slug: Some("gpt-5.1-codex".to_string()), + completed_at: 1, + repo_hash: None, + accepted_added_lines: 1, + accepted_deleted_lines: 0, + line_fingerprints: vec![AcceptedLineFingerprint { + path_hash: "path-hash".to_string(), + line_hash: "line-hash".to_string(), + }], + }, + }, + )) +} + +fn sample_regular_track_event(thread_id: &str) -> TrackEventRequest { + TrackEventRequest::SkillInvocation(SkillInvocationEventRequest { + event_type: "skill_invocation", + skill_id: format!("skill-{thread_id}"), + skill_name: "doc".to_string(), + event_params: SkillInvocationEventParams { + product_client_id: None, + skill_scope: None, + plugin_id: None, + repo_url: None, + thread_id: Some(thread_id.to_string()), + turn_id: Some("turn-1".to_string()), + invoke_type: Some(InvocationType::Explicit), + model_slug: Some("gpt-5.1-codex".to_string()), + }, + }) +} + fn client_with_receiver() -> (AnalyticsEventsClient, mpsc::Receiver) { let (sender, receiver) = mpsc::channel(8); let queue = AnalyticsEventsQueue { @@ -222,3 +271,23 @@ fn track_response_only_enqueues_analytics_relevant_responses() { ); assert!(matches!(receiver.try_recv(), Err(TryRecvError::Empty))); } + +#[test] +fn track_event_request_batches_only_isolates_accepted_line_fingerprint_events() { + let batches = track_event_request_batches(vec![ + sample_regular_track_event("thread-1"), + sample_regular_track_event("thread-2"), + sample_accepted_line_fingerprint_event("thread-3"), + sample_accepted_line_fingerprint_event("thread-4"), + sample_regular_track_event("thread-5"), + sample_regular_track_event("thread-6"), + ]); + + assert_eq!(batches.len(), 4); + assert_eq!(batches[0].len(), 2); + assert_eq!(batches[1].len(), 1); + assert_eq!(batches[2].len(), 1); + assert_eq!(batches[3].len(), 2); + assert!(batches[1][0].should_send_in_isolated_request()); + assert!(batches[2][0].should_send_in_isolated_request()); +} diff --git a/codex-rs/analytics/src/events.rs b/codex-rs/analytics/src/events.rs index 23afd83b75..f058988d6c 100644 --- a/codex-rs/analytics/src/events.rs +++ b/codex-rs/analytics/src/events.rs @@ -1,5 +1,6 @@ use std::time::Instant; +use crate::facts::AcceptedLineFingerprint; use crate::facts::AppInvocation; use crate::facts::CodexCompactionEvent; use crate::facts::CompactionImplementation; @@ -70,6 +71,7 @@ pub(crate) enum TrackEventRequest { CollabAgentToolCall(CodexCollabAgentToolCallEventRequest), WebSearch(CodexWebSearchEventRequest), ImageGeneration(CodexImageGenerationEventRequest), + AcceptedLineFingerprints(Box), #[allow(dead_code)] ReviewEvent(CodexReviewEventRequest), PluginUsed(CodexPluginUsedEventRequest), @@ -79,6 +81,32 @@ pub(crate) enum TrackEventRequest { PluginDisabled(CodexPluginEventRequest), } +impl TrackEventRequest { + pub(crate) fn should_send_in_isolated_request(&self) -> bool { + matches!(self, Self::AcceptedLineFingerprints(_)) + } +} + +#[derive(Serialize)] +pub(crate) struct CodexAcceptedLineFingerprintsEventParams { + pub(crate) event_type: &'static str, + pub(crate) turn_id: String, + pub(crate) thread_id: String, + pub(crate) product_surface: Option, + pub(crate) model_slug: Option, + pub(crate) completed_at: u64, + pub(crate) repo_hash: Option, + pub(crate) accepted_added_lines: u64, + pub(crate) accepted_deleted_lines: u64, + pub(crate) line_fingerprints: Vec, +} + +#[derive(Serialize)] +pub(crate) struct CodexAcceptedLineFingerprintsEventRequest { + pub(crate) event_type: &'static str, + pub(crate) event_params: CodexAcceptedLineFingerprintsEventParams, +} + #[derive(Serialize)] pub(crate) struct SkillInvocationEventRequest { pub(crate) event_type: &'static str, diff --git a/codex-rs/analytics/src/facts.rs b/codex-rs/analytics/src/facts.rs index 861e6534a2..db35f7469d 100644 --- a/codex-rs/analytics/src/facts.rs +++ b/codex-rs/analytics/src/facts.rs @@ -28,6 +28,12 @@ use codex_protocol::protocol::TokenUsage; use serde::Serialize; use std::path::PathBuf; +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub struct AcceptedLineFingerprint { + pub path_hash: String, + pub line_hash: String, +} + #[derive(Clone)] pub struct TrackEventsContext { pub model_slug: String, diff --git a/codex-rs/analytics/src/lib.rs b/codex-rs/analytics/src/lib.rs index 2fb23199cb..55e9c1b1f8 100644 --- a/codex-rs/analytics/src/lib.rs +++ b/codex-rs/analytics/src/lib.rs @@ -1,3 +1,5 @@ +mod accepted_line_events; +mod accepted_lines; mod client; mod events; mod facts; @@ -6,6 +8,8 @@ mod reducer; use std::time::SystemTime; use std::time::UNIX_EPOCH; +pub use accepted_lines::accepted_line_fingerprints_from_unified_diff; +pub use accepted_lines::fingerprint_hash; pub use client::AnalyticsEventsClient; pub use events::AppServerRpcTransport; pub use events::GuardianApprovalRequestSource; @@ -17,6 +21,7 @@ pub use events::GuardianReviewSessionKind; pub use events::GuardianReviewTerminalStatus; pub use events::GuardianReviewTrackContext; pub use events::GuardianReviewedAction; +pub use facts::AcceptedLineFingerprint; pub use facts::AnalyticsJsonRpcError; pub use facts::AppInvocation; pub use facts::CodexCompactionEvent; diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index d35fb96602..067e8252f6 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -1,3 +1,7 @@ +use crate::accepted_line_events::AcceptedLineFingerprintEventInput; +use crate::accepted_line_events::accepted_line_fingerprint_event_requests; +use crate::accepted_line_events::accepted_line_repo_hash_for_cwd; +use crate::accepted_lines::accepted_line_fingerprints_from_unified_diff; use crate::events::AppServerRpcTransport; use crate::events::CodexAppMentionedEventRequest; use crate::events::CodexAppServerClientMetadata; @@ -104,6 +108,7 @@ use codex_protocol::protocol::TokenUsage; use sha1::Digest; use std::collections::HashMap; use std::path::Path; +use std::path::PathBuf; #[derive(Default)] pub(crate) struct AnalyticsReducer { @@ -264,6 +269,7 @@ struct TurnState { started_at: Option, token_usage: Option, completed: Option, + latest_diff: Option, steer_count: usize, } @@ -305,7 +311,7 @@ impl AnalyticsReducer { response, } => { if let Some(response) = response.into_client_response(request_id) { - self.ingest_response(connection_id, response, out); + self.ingest_response(connection_id, response, out).await; } } AnalyticsFact::ErrorResponse { @@ -317,7 +323,7 @@ impl AnalyticsReducer { self.ingest_error_response(connection_id, request_id, error_type, out); } AnalyticsFact::Notification(notification) => { - self.ingest_notification(*notification, out); + self.ingest_notification(*notification, out).await; } AnalyticsFact::ServerRequest { connection_id: _connection_id, @@ -337,10 +343,10 @@ impl AnalyticsReducer { self.ingest_guardian_review(*input, out); } CustomAnalyticsFact::TurnResolvedConfig(input) => { - self.ingest_turn_resolved_config(*input, out); + self.ingest_turn_resolved_config(*input, out).await; } CustomAnalyticsFact::TurnTokenUsage(input) => { - self.ingest_turn_token_usage(*input, out); + self.ingest_turn_token_usage(*input, out).await; } CustomAnalyticsFact::SkillInvoked(input) => { self.ingest_skill_invoked(input, out).await; @@ -472,7 +478,7 @@ impl AnalyticsReducer { } } - fn ingest_turn_resolved_config( + async fn ingest_turn_resolved_config( &mut self, input: TurnResolvedConfigFact, out: &mut Vec, @@ -488,15 +494,16 @@ impl AnalyticsReducer { started_at: None, token_usage: None, completed: None, + latest_diff: None, steer_count: 0, }); turn_state.thread_id = Some(thread_id); turn_state.num_input_images = Some(num_input_images); turn_state.resolved_config = Some(input); - self.maybe_emit_turn_event(&turn_id, out); + self.maybe_emit_turn_event(&turn_id, out).await; } - fn ingest_turn_token_usage( + async fn ingest_turn_token_usage( &mut self, input: TurnTokenUsageFact, out: &mut Vec, @@ -510,11 +517,12 @@ impl AnalyticsReducer { started_at: None, token_usage: None, completed: None, + latest_diff: None, steer_count: 0, }); turn_state.thread_id = Some(input.thread_id); turn_state.token_usage = Some(input.token_usage); - self.maybe_emit_turn_event(&turn_id, out); + self.maybe_emit_turn_event(&turn_id, out).await; } async fn ingest_skill_invoked( @@ -621,7 +629,7 @@ impl AnalyticsReducer { }); } - fn ingest_response( + async fn ingest_response( &mut self, connection_id: u64, response: ClientResponse, @@ -673,12 +681,13 @@ impl AnalyticsReducer { started_at: None, token_usage: None, completed: None, + latest_diff: None, steer_count: 0, }); turn_state.connection_id = Some(connection_id); turn_state.thread_id = Some(pending_request.thread_id); turn_state.num_input_images = Some(pending_request.num_input_images); - self.maybe_emit_turn_event(&turn_id, out); + self.maybe_emit_turn_event(&turn_id, out).await; } ClientResponse::TurnSteer { request_id, @@ -740,7 +749,7 @@ impl AnalyticsReducer { ); } - fn ingest_notification( + async fn ingest_notification( &mut self, notification: ServerNotification, out: &mut Vec, @@ -811,6 +820,7 @@ impl AnalyticsReducer { started_at: None, token_usage: None, completed: None, + latest_diff: None, steer_count: 0, }); turn_state.started_at = notification @@ -818,6 +828,24 @@ impl AnalyticsReducer { .started_at .and_then(|started_at| u64::try_from(started_at).ok()); } + ServerNotification::TurnDiffUpdated(notification) => { + let turn_state = + self.turns + .entry(notification.turn_id.clone()) + .or_insert(TurnState { + connection_id: None, + thread_id: None, + num_input_images: None, + resolved_config: None, + started_at: None, + token_usage: None, + completed: None, + latest_diff: None, + steer_count: 0, + }); + turn_state.thread_id = Some(notification.thread_id); + turn_state.latest_diff = Some(notification.diff); + } ServerNotification::TurnCompleted(notification) => { let turn_state = self.turns @@ -830,6 +858,7 @@ impl AnalyticsReducer { started_at: None, token_usage: None, completed: None, + latest_diff: None, steer_count: 0, }); turn_state.completed = Some(CompletedTurnState { @@ -849,7 +878,7 @@ impl AnalyticsReducer { .and_then(|duration_ms| u64::try_from(duration_ms).ok()), }); let turn_id = notification.turn.id; - self.maybe_emit_turn_event(&turn_id, out); + self.maybe_emit_turn_event(&turn_id, out).await; } _ => {} } @@ -985,7 +1014,7 @@ impl AnalyticsReducer { })); } - fn maybe_emit_turn_event(&mut self, turn_id: &str, out: &mut Vec) { + async fn maybe_emit_turn_event(&mut self, turn_id: &str, out: &mut Vec) { let Some(turn_state) = self.turns.get(turn_id) else { return; }; @@ -1018,18 +1047,23 @@ impl AnalyticsReducer { warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata); return; }; - out.push(TrackEventRequest::TurnEvent(Box::new( - CodexTurnEventRequest { - event_type: "codex_turn_event", - event_params: codex_turn_event_params( - connection_state.app_server_client.clone(), - connection_state.runtime.clone(), - turn_id.to_string(), - turn_state, - thread_metadata, - ), - }, - ))); + let turn_event = TrackEventRequest::TurnEvent(Box::new(CodexTurnEventRequest { + event_type: "codex_turn_event", + event_params: codex_turn_event_params( + connection_state.app_server_client.clone(), + connection_state.runtime.clone(), + turn_id.to_string(), + turn_state, + thread_metadata, + ), + })); + let accepted_line_event = accepted_line_event_input(turn_id, turn_state); + + out.push(turn_event); + if let Some((mut input, cwd)) = accepted_line_event { + input.repo_hash = accepted_line_repo_hash_for_cwd(cwd.as_path()).await; + out.extend(accepted_line_fingerprint_event_requests(input)); + } self.turns.remove(turn_id); } @@ -1641,6 +1675,36 @@ fn web_search_query_count(query: &str, action: Option<&WebSearchAction>) -> Opti } } +fn accepted_line_event_input( + turn_id: &str, + turn_state: &TurnState, +) -> Option<(AcceptedLineFingerprintEventInput, PathBuf)> { + let latest_diff = turn_state.latest_diff.as_deref()?; + let summary = accepted_line_fingerprints_from_unified_diff(latest_diff); + if summary.accepted_added_lines == 0 && summary.accepted_deleted_lines == 0 { + return None; + } + + let thread_id = turn_state.thread_id.clone()?; + let resolved_config = turn_state.resolved_config.clone()?; + + Some(( + AcceptedLineFingerprintEventInput { + event_type: "codex.accepted_line_fingerprints", + turn_id: turn_id.to_string(), + thread_id, + product_surface: Some("codex".to_string()), + model_slug: Some(resolved_config.model.clone()), + completed_at: now_unix_seconds(), + repo_hash: None, + accepted_added_lines: summary.accepted_added_lines, + accepted_deleted_lines: summary.accepted_deleted_lines, + line_fingerprints: summary.line_fingerprints, + }, + resolved_config.permission_profile_cwd, + )) +} + fn codex_turn_event_params( app_server_client: CodexAppServerClientMetadata, runtime: CodexRuntimeMetadata, diff --git a/codex-rs/git-utils/src/info.rs b/codex-rs/git-utils/src/info.rs index 067dd15869..c6656b8fa5 100644 --- a/codex-rs/git-utils/src/info.rs +++ b/codex-rs/git-utils/src/info.rs @@ -158,6 +158,108 @@ pub async fn get_head_commit_hash(cwd: &Path) -> Option { } } +pub fn canonicalize_git_remote_url(url: &str) -> Option { + let url = trim_git_suffix(url.trim().trim_end_matches('/')); + if url.is_empty() { + return None; + } + + if let Some((scheme, rest)) = url.split_once("://") { + return canonicalize_git_url_like_remote(scheme, rest); + } + + if let Some((host_part, path)) = parse_scp_like_remote(url) { + return canonicalize_git_remote_host_path(host_part, path, /*default_port*/ None); + } + + let (host_part, path) = url.split_once('/')?; + canonicalize_git_remote_host_path(host_part, path, /*default_port*/ None) +} + +fn canonicalize_git_url_like_remote(scheme: &str, rest: &str) -> Option { + let default_port = match scheme { + "git" => Some("9418"), + "http" => Some("80"), + "https" => Some("443"), + "ssh" => Some("22"), + _ => return None, + }; + + let rest = rest + .find(['?', '#']) + .map_or(rest, |suffix_index| &rest[..suffix_index]); + let (host_part, path) = rest.split_once('/')?; + canonicalize_git_remote_host_path(host_part, path, default_port) +} + +fn parse_scp_like_remote(remote: &str) -> Option<(&str, &str)> { + if remote.contains('/') + && remote + .find('/') + .is_some_and(|slash| remote.find(':').is_none_or(|colon| slash < colon)) + { + return None; + } + + let (host_part, path) = remote.split_once(':')?; + if host_part.is_empty() || path.is_empty() { + return None; + } + Some((host_part, path)) +} + +fn canonicalize_git_remote_host_path( + host_part: &str, + path: &str, + default_port: Option<&str>, +) -> Option { + let host = normalize_remote_host( + host_part + .rsplit_once('@') + .map_or(host_part, |(_, host)| host) + .trim() + .trim_end_matches('/'), + default_port, + ); + if host.is_empty() { + return None; + } + + let path = trim_git_suffix(path.trim().trim_matches('/')); + let components = path + .split('/') + .filter(|component| !component.is_empty()) + .collect::>(); + let [owner, repo, ..] = components.as_slice() else { + return None; + }; + if matches!((*owner, *repo), ("." | "..", _) | (_, "." | "..")) { + return None; + } + let path = components.join("/"); + + if host == "github.com" { + Some(format!("{host}/{}", path.to_ascii_lowercase())) + } else { + Some(format!("{host}/{path}")) + } +} + +fn normalize_remote_host(host: &str, default_port: Option<&str>) -> String { + let host = host.to_ascii_lowercase(); + if let Some(default_port) = default_port + && let Some((host_without_port, port)) = host.rsplit_once(':') + && port == default_port + { + return host_without_port.to_string(); + } + host +} + +fn trim_git_suffix(value: &str) -> &str { + value.strip_suffix(".git").unwrap_or(value) +} + pub async fn get_has_changes(cwd: &Path) -> Option { let output = run_git_command_with_timeout(&["status", "--porcelain"], cwd).await?; if !output.status.success() { @@ -724,3 +826,46 @@ pub async fn current_branch_name(cwd: &Path) -> Option { .map(|s| s.trim().to_string()) .filter(|name| !name.is_empty()) } + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn canonicalize_git_remote_url_normalizes_github_variants() { + for remote in [ + "git@github.com:OpenAI/Codex.git", + "ssh://git@github.com/openai/codex.git", + "ssh://git@github.com:22/OpenAI/Codex.git", + "https://github.com/openai/codex.git", + "https://github.com:443/openai/codex.git", + "https://token@github.com/openai/codex/", + "github.com/OpenAI/Codex.git", + ] { + assert_eq!( + canonicalize_git_remote_url(remote), + Some("github.com/openai/codex".to_string()) + ); + } + } + + #[test] + fn canonicalize_git_remote_url_handles_ghe_without_lowercasing_path() { + assert_eq!( + canonicalize_git_remote_url("git@ghe.company.com:Org/Repo.git"), + Some("ghe.company.com/Org/Repo".to_string()) + ); + assert_eq!( + canonicalize_git_remote_url("ssh://git@ghe.company.com:2222/Org/Repo.git"), + Some("ghe.company.com:2222/Org/Repo".to_string()) + ); + } + + #[test] + fn canonicalize_git_remote_url_rejects_non_repository_values() { + for remote in ["", "file:///tmp/repo", "github.com/openai", "/tmp/repo"] { + assert_eq!(canonicalize_git_remote_url(remote), None); + } + } +} diff --git a/codex-rs/git-utils/src/lib.rs b/codex-rs/git-utils/src/lib.rs index 63eaf586d5..bcd1a8b5c9 100644 --- a/codex-rs/git-utils/src/lib.rs +++ b/codex-rs/git-utils/src/lib.rs @@ -24,6 +24,7 @@ pub use errors::GitToolingError; pub use info::CommitLogEntry; pub use info::GitDiffToRemote; pub use info::GitInfo; +pub use info::canonicalize_git_remote_url; pub use info::collect_git_info; pub use info::current_branch_name; pub use info::default_branch_name;