From fbdbc6b2fea7522e9fc1fc87d88168b945507ad4 Mon Sep 17 00:00:00 2001 From: rhan-oai Date: Wed, 6 May 2026 13:27:41 -0700 Subject: [PATCH] [codex-analytics] emit tool item events from item lifecycle (#17090) ## Why After the tool-item schemas are in place, analytics needs to emit them from the app-server item lifecycle rather than requiring bespoke tracking at each callsite. The reducer should also reuse the shared thread analytics context introduced below it in the stack so later event families do not repeat the same reducer joins or missing-state ladder. ## What changed - Tracks tool-item completion notifications and emits the matching tool analytics event when a terminal item arrives. - Derives event-specific payload details for command execution, file changes, MCP calls, dynamic tools, collaboration tools, web search, and image generation. - Denormalizes thread, app-server client, runtime, and subagent provenance metadata through the shared thread analytics context. - Adds reducer coverage for item lifecycle emission and subagent metadata inheritance. ## Duration semantics `duration_ms` is computed from the app-server item lifecycle timestamps: `completed_at_ms - started_at_ms`. That makes it the duration of the lifecycle Codex observed locally, not necessarily the upstream provider's full execution time. - Web search usually has a meaningful observed lifecycle because Responses can send `response.output_item.added` before `response.output_item.done`; in that case `started_at_ms` comes from the added event and `completed_at_ms` comes from the done event. - Image generation can be much less precise. In the current observed stream, image generation often arrives only as a completed `response.output_item.done`; when there is no earlier added event, Codex synthesizes the started item immediately before completion, so `duration_ms` can be `0` even though upstream image generation took longer. - Standalone web search and standalone image generation work is expected to land after this stack. Those paths may introduce more direct lifecycle events or timing points, so the current web-search/image-generation duration semantics should be treated as the best available item-lifecycle approximation, not the final latency contract for those tool families. - `execution_duration_ms` is populated only where the completed item already carries a native execution duration; otherwise it remains `null` while `duration_ms` still reflects the local lifecycle interval. ## Currently placeholder / partial fields Some fields are included in the schema for the intended steady-state contract, but this PR does not yet populate them from real approval/review state: - `review_count`, `guardian_review_count`, and `user_review_count` currently default to `0`. - `final_approval_outcome` currently defaults to `unknown`. - `requested_additional_permissions` and `requested_network_access` currently default to `false`. ## Verification - `cargo test -p codex-analytics` --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/17090). * #18748 * #18747 * __->__ #17090 * #17089 * #20514 --- .../analytics/src/analytics_client_tests.rs | 274 +++++++- codex-rs/analytics/src/client.rs | 19 +- codex-rs/analytics/src/events.rs | 23 +- codex-rs/analytics/src/lib.rs | 24 + codex-rs/analytics/src/reducer.rs | 660 +++++++++++++++++- .../app-server/src/bespoke_event_handling.rs | 42 -- codex-rs/app-server/src/message_processor.rs | 1 - codex-rs/app-server/src/outgoing_message.rs | 7 +- .../request_processors/thread_lifecycle.rs | 3 - .../request_processors/thread_processor.rs | 5 - .../src/request_processors/turn_processor.rs | 1 - 11 files changed, 980 insertions(+), 79 deletions(-) diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 121a378566..fd294385e5 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -12,7 +12,6 @@ use crate::events::CodexPluginUsedEventRequest; use crate::events::CodexRuntimeMetadata; use crate::events::CodexToolItemEventBase; use crate::events::CodexTurnEventRequest; -use crate::events::CommandExecutionSource; use crate::events::GuardianApprovalRequestSource; use crate::events::GuardianReviewDecision; use crate::events::GuardianReviewEventParams; @@ -67,8 +66,13 @@ use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::ClientResponsePayload; use codex_app_server_protocol::CodexErrorInfo; +use codex_app_server_protocol::CommandAction; +use codex_app_server_protocol::CommandExecutionSource; +use codex_app_server_protocol::CommandExecutionStatus; use codex_app_server_protocol::InitializeCapabilities; use codex_app_server_protocol::InitializeParams; +use codex_app_server_protocol::ItemCompletedNotification; +use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::NonSteerableTurnKind; use codex_app_server_protocol::RequestId; @@ -78,6 +82,7 @@ use codex_app_server_protocol::SessionSource as AppServerSessionSource; use codex_app_server_protocol::Thread; use codex_app_server_protocol::ThreadArchiveParams; use codex_app_server_protocol::ThreadArchiveResponse; +use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadSource as AppServerThreadSource; use codex_app_server_protocol::ThreadStartResponse; @@ -598,6 +603,90 @@ async fn ingest_turn_prerequisites( } } +async fn ingest_tool_review_prerequisites( + reducer: &mut AnalyticsReducer, + events: &mut Vec, +) { + reducer + .ingest(sample_initialize_fact(/*connection_id*/ 7), events) + .await; + reducer + .ingest( + AnalyticsFact::ClientResponse { + connection_id: 7, + request_id: RequestId::Integer(1), + response: Box::new(sample_thread_start_response( + "thread-1", /*ephemeral*/ false, "gpt-5", + )), + }, + events, + ) + .await; + events.clear(); +} + +fn sample_initialize_fact(connection_id: u64) -> AnalyticsFact { + AnalyticsFact::Initialize { + connection_id, + params: InitializeParams { + client_info: ClientInfo { + name: "codex-tui".to_string(), + title: None, + version: "1.0.0".to_string(), + }, + capabilities: Some(InitializeCapabilities { + experimental_api: false, + opt_out_notification_methods: None, + }), + }, + product_client_id: DEFAULT_ORIGINATOR.to_string(), + runtime: CodexRuntimeMetadata { + codex_rs_version: "0.99.0".to_string(), + runtime_os: "linux".to_string(), + runtime_os_version: "24.04".to_string(), + runtime_arch: "x86_64".to_string(), + }, + rpc_transport: AppServerRpcTransport::Websocket, + } +} + +fn sample_command_execution_item( + status: CommandExecutionStatus, + exit_code: Option, + duration_ms: Option, +) -> ThreadItem { + ThreadItem::CommandExecution { + id: "item-1".to_string(), + command: "echo hi".to_string(), + cwd: test_path_buf("/tmp").abs(), + process_id: Some("pid-1".to_string()), + source: CommandExecutionSource::Agent, + status, + command_actions: Vec::new(), + aggregated_output: None, + exit_code, + duration_ms, + } +} + +fn sample_command_execution_item_with_actions( + status: CommandExecutionStatus, + exit_code: Option, + duration_ms: Option, + command_actions: Vec, +) -> ThreadItem { + let mut item = sample_command_execution_item(status, exit_code, duration_ms); + let ThreadItem::CommandExecution { + command_actions: item_command_actions, + .. + } = &mut item + else { + unreachable!("sample command execution item should be CommandExecution"); + }; + *item_command_actions = command_actions; + item +} + fn expected_absolute_path(path: &PathBuf) -> String { std::fs::canonicalize(path) .unwrap_or_else(|_| path.to_path_buf()) @@ -930,6 +1019,7 @@ fn command_execution_event_serializes_expected_shape() { started_at_ms: 123_000, completed_at_ms: 125_000, duration_ms: Some(2000), + execution_duration_ms: Some(1900), review_count: 0, guardian_review_count: 0, user_review_count: 0, @@ -978,6 +1068,7 @@ fn command_execution_event_serializes_expected_shape() { "started_at_ms": 123000, "completed_at_ms": 125000, "duration_ms": 2000, + "execution_duration_ms": 1900, "review_count": 0, "guardian_review_count": 0, "user_review_count": 0, @@ -1404,6 +1495,114 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() { assert_eq!(payload[0]["event_params"]["review_timeout_ms"], 90_000); } +#[tokio::test] +async fn item_lifecycle_notifications_publish_command_execution_event() { + let mut reducer = AnalyticsReducer::default(); + let mut events = Vec::new(); + + ingest_tool_review_prerequisites(&mut reducer, &mut events).await; + reducer + .ingest( + AnalyticsFact::Notification(Box::new(ServerNotification::ItemStarted( + ItemStartedNotification { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + started_at_ms: 1_000, + item: sample_command_execution_item( + CommandExecutionStatus::InProgress, + /*exit_code*/ None, + /*duration_ms*/ None, + ), + }, + ))), + &mut events, + ) + .await; + assert!( + events.is_empty(), + "tool item event should emit on completion" + ); + + reducer + .ingest( + AnalyticsFact::Notification(Box::new(ServerNotification::ItemCompleted( + ItemCompletedNotification { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + completed_at_ms: 1_045, + item: sample_command_execution_item_with_actions( + CommandExecutionStatus::Completed, + Some(0), + Some(42), + vec![ + CommandAction::Read { + command: "cat README.md".to_string(), + name: "README.md".to_string(), + path: test_path_buf("/tmp/README.md").abs(), + }, + CommandAction::ListFiles { + command: "ls".to_string(), + path: None, + }, + CommandAction::Search { + command: "rg TODO".to_string(), + query: Some("TODO".to_string()), + path: None, + }, + CommandAction::Unknown { + command: "cargo test".to_string(), + }, + ], + ), + }, + ))), + &mut events, + ) + .await; + + let payload = serde_json::to_value(&events).expect("serialize events"); + assert_eq!(payload.as_array().expect("events array").len(), 1); + assert_eq!(payload[0]["event_type"], "codex_command_execution_event"); + assert_eq!(payload[0]["event_params"]["thread_id"], "thread-1"); + assert_eq!(payload[0]["event_params"]["turn_id"], "turn-1"); + assert_eq!(payload[0]["event_params"]["item_id"], "item-1"); + assert_eq!(payload[0]["event_params"]["tool_name"], "shell"); + assert_eq!( + payload[0]["event_params"]["command_execution_source"], + "agent" + ); + assert_eq!(payload[0]["event_params"]["terminal_status"], "completed"); + assert_eq!( + payload[0]["event_params"]["final_approval_outcome"], + "unknown" + ); + assert_eq!( + payload[0]["event_params"]["failure_kind"], + serde_json::Value::Null + ); + assert_eq!(payload[0]["event_params"]["exit_code"], 0); + assert_eq!(payload[0]["event_params"]["command_total_action_count"], 4); + assert_eq!(payload[0]["event_params"]["command_read_action_count"], 1); + assert_eq!( + payload[0]["event_params"]["command_list_files_action_count"], + 1 + ); + assert_eq!(payload[0]["event_params"]["command_search_action_count"], 1); + assert_eq!( + payload[0]["event_params"]["command_unknown_action_count"], + 1 + ); + assert_eq!(payload[0]["event_params"]["started_at_ms"], 1_000); + assert_eq!(payload[0]["event_params"]["completed_at_ms"], 1_045); + assert_eq!(payload[0]["event_params"]["duration_ms"], 45); + assert_eq!(payload[0]["event_params"]["execution_duration_ms"], 42); + assert_eq!( + payload[0]["event_params"]["app_server_client"]["client_name"], + "codex-tui" + ); + assert_eq!(payload[0]["event_params"]["thread_source"], "user"); +} + #[test] fn subagent_thread_started_review_serializes_expected_shape() { let event = TrackEventRequest::ThreadInitialized(subagent_thread_started_event_request( @@ -1687,6 +1886,79 @@ async fn subagent_thread_started_inherits_parent_connection_for_new_thread() { ); } +#[tokio::test] +async fn subagent_tool_items_inherit_parent_connection_metadata() { + let mut reducer = AnalyticsReducer::default(); + let mut events = Vec::new(); + + ingest_tool_review_prerequisites(&mut reducer, &mut events).await; + reducer + .ingest( + AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted( + SubAgentThreadStartedInput { + thread_id: "thread-subagent".to_string(), + parent_thread_id: Some("thread-1".to_string()), + product_client_id: "codex-tui".to_string(), + client_name: "codex-tui".to_string(), + client_version: "1.0.0".to_string(), + model: "gpt-5".to_string(), + ephemeral: false, + subagent_source: SubAgentSource::Review, + created_at: 128, + }, + )), + &mut events, + ) + .await; + events.clear(); + + reducer + .ingest( + AnalyticsFact::Notification(Box::new(ServerNotification::ItemStarted( + ItemStartedNotification { + thread_id: "thread-subagent".to_string(), + turn_id: "turn-subagent".to_string(), + started_at_ms: 1_000, + item: sample_command_execution_item( + CommandExecutionStatus::InProgress, + /*exit_code*/ None, + /*duration_ms*/ None, + ), + }, + ))), + &mut events, + ) + .await; + reducer + .ingest( + AnalyticsFact::Notification(Box::new(ServerNotification::ItemCompleted( + ItemCompletedNotification { + thread_id: "thread-subagent".to_string(), + turn_id: "turn-subagent".to_string(), + completed_at_ms: 1_042, + item: sample_command_execution_item( + CommandExecutionStatus::Completed, + Some(0), + Some(42), + ), + }, + ))), + &mut events, + ) + .await; + + let payload = serde_json::to_value(&events).expect("serialize events"); + assert_eq!(payload.as_array().expect("events array").len(), 1); + assert_eq!(payload[0]["event_type"], "codex_command_execution_event"); + assert_eq!(payload[0]["event_params"]["thread_source"], "subagent"); + assert_eq!(payload[0]["event_params"]["subagent_source"], "review"); + assert_eq!(payload[0]["event_params"]["parent_thread_id"], "thread-1"); + assert_eq!( + payload[0]["event_params"]["app_server_client"]["client_name"], + "codex-tui" + ); +} + #[test] fn plugin_used_event_serializes_expected_shape() { let tracking = TrackEventsContext { diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index d54c53ede9..6d6d446560 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -333,10 +333,6 @@ impl AnalyticsEventsClient { }); } - pub fn track_notification(&self, notification: ServerNotification) { - self.record_fact(AnalyticsFact::Notification(Box::new(notification))); - } - pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) { self.record_fact(AnalyticsFact::ServerRequest { connection_id, @@ -349,6 +345,21 @@ impl AnalyticsEventsClient { response: Box::new(response), }); } + + pub fn track_notification(&self, notification: ServerNotification) { + if !matches!( + notification, + ServerNotification::TurnStarted(_) + | ServerNotification::TurnCompleted(_) + | ServerNotification::ItemStarted(_) + | ServerNotification::ItemCompleted(_) + | ServerNotification::ItemGuardianApprovalReviewStarted(_) + | ServerNotification::ItemGuardianApprovalReviewCompleted(_) + ) { + return; + } + self.record_fact(AnalyticsFact::Notification(Box::new(notification))); + } } async fn send_track_events( diff --git a/codex-rs/analytics/src/events.rs b/codex-rs/analytics/src/events.rs index 383cf6857e..2232c88d39 100644 --- a/codex-rs/analytics/src/events.rs +++ b/codex-rs/analytics/src/events.rs @@ -20,6 +20,7 @@ use crate::facts::TurnSteerResult; use crate::facts::TurnSubmissionType; use crate::now_unix_seconds; use codex_app_server_protocol::CodexErrorInfo; +use codex_app_server_protocol::CommandExecutionSource; use codex_login::default_client::originator; use codex_plugin::PluginTelemetryMetadata; use codex_protocol::approvals::NetworkApprovalProtocol; @@ -62,19 +63,12 @@ pub(crate) enum TrackEventRequest { Compaction(Box), TurnEvent(Box), TurnSteer(CodexTurnSteerEventRequest), - #[allow(dead_code)] CommandExecution(CodexCommandExecutionEventRequest), - #[allow(dead_code)] FileChange(CodexFileChangeEventRequest), - #[allow(dead_code)] McpToolCall(CodexMcpToolCallEventRequest), - #[allow(dead_code)] DynamicToolCall(CodexDynamicToolCallEventRequest), - #[allow(dead_code)] CollabAgentToolCall(CodexCollabAgentToolCallEventRequest), - #[allow(dead_code)] WebSearch(CodexWebSearchEventRequest), - #[allow(dead_code)] ImageGeneration(CodexImageGenerationEventRequest), PluginUsed(CodexPluginUsedEventRequest), PluginInstalled(CodexPluginEventRequest), @@ -454,7 +448,10 @@ pub(crate) struct CodexToolItemEventBase { pub(crate) tool_name: String, pub(crate) started_at_ms: u64, pub(crate) completed_at_ms: u64, + // Observed item lifecycle duration. This may undercount end-to-end execution + // for tools where app-server only sees part of the upstream flow. pub(crate) duration_ms: Option, + pub(crate) execution_duration_ms: Option, pub(crate) review_count: u64, pub(crate) guardian_review_count: u64, pub(crate) user_review_count: u64, @@ -465,17 +462,6 @@ pub(crate) struct CodexToolItemEventBase { pub(crate) requested_network_access: bool, } -#[allow(dead_code)] -#[derive(Clone, Copy, Debug, Serialize)] -#[serde(rename_all = "snake_case")] -pub(crate) enum CommandExecutionSource { - Agent, - UserShell, - UnifiedExecStartup, - UnifiedExecInteraction, -} - -#[allow(dead_code)] #[derive(Clone, Copy, Debug, Serialize)] #[serde(rename_all = "snake_case")] pub(crate) enum WebSearchActionKind { @@ -592,7 +578,6 @@ pub(crate) struct CodexWebSearchEventRequest { pub(crate) struct CodexImageGenerationEventParams { #[serde(flatten)] pub(crate) base: CodexToolItemEventBase, - pub(crate) image_generation_status: String, pub(crate) revised_prompt_present: bool, pub(crate) saved_path_present: bool, } diff --git a/codex-rs/analytics/src/lib.rs b/codex-rs/analytics/src/lib.rs index ed0f1036ca..2fb23199cb 100644 --- a/codex-rs/analytics/src/lib.rs +++ b/codex-rs/analytics/src/lib.rs @@ -51,3 +51,27 @@ pub fn now_unix_seconds() -> u64 { .unwrap_or_default() .as_secs() } + +pub fn now_unix_millis() -> u64 { + u64::try_from( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(), + ) + .unwrap_or(u64::MAX) +} + +pub(crate) fn serialize_enum_as_string(value: &T) -> Option { + serde_json::to_value(value) + .ok() + .and_then(|value| value.as_str().map(str::to_string)) +} + +pub(crate) fn usize_to_u64(value: usize) -> u64 { + u64::try_from(value).unwrap_or(u64::MAX) +} + +pub(crate) fn option_i64_to_u64(value: Option) -> Option { + value.and_then(|value| u64::try_from(value).ok()) +} diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index 772bb6c624..81530444de 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -2,15 +2,30 @@ use crate::events::AppServerRpcTransport; use crate::events::CodexAppMentionedEventRequest; use crate::events::CodexAppServerClientMetadata; use crate::events::CodexAppUsedEventRequest; +use crate::events::CodexCollabAgentToolCallEventParams; +use crate::events::CodexCollabAgentToolCallEventRequest; +use crate::events::CodexCommandExecutionEventParams; +use crate::events::CodexCommandExecutionEventRequest; use crate::events::CodexCompactionEventRequest; +use crate::events::CodexDynamicToolCallEventParams; +use crate::events::CodexDynamicToolCallEventRequest; +use crate::events::CodexFileChangeEventParams; +use crate::events::CodexFileChangeEventRequest; use crate::events::CodexHookRunEventRequest; +use crate::events::CodexImageGenerationEventParams; +use crate::events::CodexImageGenerationEventRequest; +use crate::events::CodexMcpToolCallEventParams; +use crate::events::CodexMcpToolCallEventRequest; use crate::events::CodexPluginEventRequest; use crate::events::CodexPluginUsedEventRequest; use crate::events::CodexRuntimeMetadata; +use crate::events::CodexToolItemEventBase; use crate::events::CodexTurnEventParams; use crate::events::CodexTurnEventRequest; use crate::events::CodexTurnSteerEventParams; use crate::events::CodexTurnSteerEventRequest; +use crate::events::CodexWebSearchEventParams; +use crate::events::CodexWebSearchEventRequest; use crate::events::GuardianReviewEventParams; use crate::events::GuardianReviewEventPayload; use crate::events::GuardianReviewEventRequest; @@ -18,7 +33,11 @@ use crate::events::SkillInvocationEventParams; use crate::events::SkillInvocationEventRequest; use crate::events::ThreadInitializedEvent; use crate::events::ThreadInitializedEventParams; +use crate::events::ToolItemFailureKind; +use crate::events::ToolItemFinalApprovalOutcome; +use crate::events::ToolItemTerminalStatus; use crate::events::TrackEventRequest; +use crate::events::WebSearchActionKind; use crate::events::codex_app_metadata; use crate::events::codex_compaction_event_params; use crate::events::codex_hook_run_metadata; @@ -47,14 +66,30 @@ use crate::facts::TurnSteerRejectionReason; use crate::facts::TurnSteerResult; use crate::facts::TurnTokenUsageFact; use crate::now_unix_seconds; +use crate::option_i64_to_u64; +use crate::serialize_enum_as_string; +use crate::usize_to_u64; use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::ClientResponse; use codex_app_server_protocol::CodexErrorInfo; +use codex_app_server_protocol::CollabAgentStatus; +use codex_app_server_protocol::CollabAgentTool; +use codex_app_server_protocol::CollabAgentToolCallStatus; +use codex_app_server_protocol::CommandAction; +use codex_app_server_protocol::CommandExecutionSource; +use codex_app_server_protocol::CommandExecutionStatus; +use codex_app_server_protocol::DynamicToolCallOutputContentItem; +use codex_app_server_protocol::DynamicToolCallStatus; use codex_app_server_protocol::InitializeParams; +use codex_app_server_protocol::McpToolCallStatus; +use codex_app_server_protocol::PatchApplyStatus; +use codex_app_server_protocol::PatchChangeKind; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::TurnSteerResponse; use codex_app_server_protocol::UserInput; +use codex_app_server_protocol::WebSearchAction; use codex_git_utils::collect_git_info; use codex_git_utils::get_git_repo_root; use codex_login::default_client::originator; @@ -76,6 +111,7 @@ pub(crate) struct AnalyticsReducer { turns: HashMap, connections: HashMap, threads: HashMap, + tool_items_started_at_ms: HashMap, } struct ConnectionState { @@ -119,6 +155,19 @@ impl<'a> AnalyticsDropSite<'a> { } } + fn tool_item( + notification: &'a codex_app_server_protocol::ItemCompletedNotification, + item_id: &'a str, + ) -> Self { + Self { + event_name: "tool item", + thread_id: ¬ification.thread_id, + turn_id: Some(¬ification.turn_id), + review_id: None, + item_id: Some(item_id), + } + } + fn turn_steer(thread_id: &'a str) -> Self { Self { event_name: "turn steer", @@ -218,6 +267,13 @@ struct TurnState { steer_count: usize, } +#[derive(Hash, Eq, PartialEq)] +struct ToolItemKey { + thread_id: String, + turn_id: String, + item_id: String, +} + impl AnalyticsReducer { pub(crate) async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec) { match input { @@ -690,6 +746,62 @@ impl AnalyticsReducer { out: &mut Vec, ) { match notification { + ServerNotification::ItemStarted(notification) => { + let Some(item_id) = tracked_tool_item_id(¬ification.item) else { + return; + }; + let Some(started_at_ms) = option_i64_to_u64(Some(notification.started_at_ms)) + else { + return; + }; + self.tool_items_started_at_ms.insert( + ToolItemKey { + thread_id: notification.thread_id, + turn_id: notification.turn_id, + item_id: item_id.to_string(), + }, + started_at_ms, + ); + } + ServerNotification::ItemCompleted(notification) => { + let Some(item_id) = tracked_tool_item_id(¬ification.item) else { + return; + }; + let key = ToolItemKey { + thread_id: notification.thread_id.clone(), + turn_id: notification.turn_id.clone(), + item_id: item_id.to_string(), + }; + let Some(started_at_ms) = self.tool_items_started_at_ms.remove(&key) else { + tracing::warn!( + thread_id = %notification.thread_id, + turn_id = %notification.turn_id, + item_id, + "dropping tool item analytics event: missing item started notification" + ); + return; + }; + let Some(completed_at_ms) = option_i64_to_u64(Some(notification.completed_at_ms)) + else { + return; + }; + let Some((connection_state, thread_metadata)) = self + .thread_context_or_warn(AnalyticsDropSite::tool_item(¬ification, item_id)) + else { + return; + }; + if let Some(event) = tool_item_event( + ¬ification.thread_id, + ¬ification.turn_id, + ¬ification.item, + started_at_ms, + completed_at_ms, + connection_state, + thread_metadata, + ) { + out.push(event); + } + } ServerNotification::TurnStarted(notification) => { let turn_state = self.turns.entry(notification.turn.id).or_insert(TurnState { connection_id: None, @@ -779,7 +891,7 @@ impl AnalyticsReducer { ephemeral: thread.ephemeral, thread_source: thread_metadata.thread_source, initialization_mode, - subagent_source: thread_metadata.subagent_source, + subagent_source: thread_metadata.subagent_source.clone(), parent_thread_id: thread_metadata.parent_thread_id, created_at: u64::try_from(thread.created_at).unwrap_or_default(), }, @@ -983,6 +1095,552 @@ fn warn_missing_analytics_context( ); } +fn tracked_tool_item_id(item: &ThreadItem) -> Option<&str> { + match item { + ThreadItem::CommandExecution { id, .. } + | ThreadItem::FileChange { id, .. } + | ThreadItem::McpToolCall { id, .. } + | ThreadItem::DynamicToolCall { id, .. } + | ThreadItem::CollabAgentToolCall { id, .. } + | ThreadItem::WebSearch { id, .. } + | ThreadItem::ImageGeneration { id, .. } => Some(id), + ThreadItem::UserMessage { .. } + | ThreadItem::HookPrompt { .. } + | ThreadItem::AgentMessage { .. } + | ThreadItem::Plan { .. } + | ThreadItem::Reasoning { .. } + | ThreadItem::ImageView { .. } + | ThreadItem::EnteredReviewMode { .. } + | ThreadItem::ExitedReviewMode { .. } + | ThreadItem::ContextCompaction { .. } => None, + } +} + +fn tool_item_event( + thread_id: &str, + turn_id: &str, + item: &ThreadItem, + started_at_ms: u64, + completed_at_ms: u64, + connection_state: &ConnectionState, + thread_metadata: &ThreadMetadataState, +) -> Option { + let context = ToolItemContext { + started_at_ms, + completed_at_ms, + connection_state, + thread_metadata, + }; + match item { + ThreadItem::CommandExecution { + id, + source, + status, + command_actions, + exit_code, + duration_ms, + .. + } => { + let (terminal_status, failure_kind) = command_execution_outcome(status)?; + let action_counts = command_action_counts(command_actions); + let base = tool_item_base( + thread_id, + turn_id, + id.clone(), + command_execution_tool_name(*source).to_string(), + ToolItemOutcome { + terminal_status, + failure_kind, + execution_duration_ms: option_i64_to_u64(*duration_ms), + }, + context, + ); + Some(TrackEventRequest::CommandExecution( + CodexCommandExecutionEventRequest { + event_type: "codex_command_execution_event", + event_params: CodexCommandExecutionEventParams { + base, + command_execution_source: *source, + exit_code: *exit_code, + command_total_action_count: action_counts.total, + command_read_action_count: action_counts.read, + command_list_files_action_count: action_counts.list_files, + command_search_action_count: action_counts.search, + command_unknown_action_count: action_counts.unknown, + }, + }, + )) + } + ThreadItem::FileChange { + id, + changes, + status, + } => { + let (terminal_status, failure_kind) = patch_apply_outcome(status)?; + let counts = file_change_counts(changes); + let base = tool_item_base( + thread_id, + turn_id, + id.clone(), + "apply_patch".to_string(), + ToolItemOutcome { + terminal_status, + failure_kind, + execution_duration_ms: None, + }, + context, + ); + Some(TrackEventRequest::FileChange(CodexFileChangeEventRequest { + event_type: "codex_file_change_event", + event_params: CodexFileChangeEventParams { + base, + file_change_count: usize_to_u64(changes.len()), + file_add_count: counts.add, + file_update_count: counts.update, + file_delete_count: counts.delete, + file_move_count: counts.move_, + }, + })) + } + ThreadItem::McpToolCall { + id, + server, + tool, + status, + error, + duration_ms, + .. + } => { + let (terminal_status, failure_kind) = mcp_tool_call_outcome(status)?; + let base = tool_item_base( + thread_id, + turn_id, + id.clone(), + tool.clone(), + ToolItemOutcome { + terminal_status, + failure_kind, + execution_duration_ms: option_i64_to_u64(*duration_ms), + }, + context, + ); + Some(TrackEventRequest::McpToolCall( + CodexMcpToolCallEventRequest { + event_type: "codex_mcp_tool_call_event", + event_params: CodexMcpToolCallEventParams { + base, + mcp_server_name: server.clone(), + mcp_tool_name: tool.clone(), + mcp_error_present: error.is_some(), + }, + }, + )) + } + ThreadItem::DynamicToolCall { + id, + tool, + status, + content_items, + success, + duration_ms, + .. + } => { + let (terminal_status, failure_kind) = dynamic_tool_call_outcome(status)?; + let counts = content_items + .as_ref() + .map(|items| dynamic_content_counts(items)); + let base = tool_item_base( + thread_id, + turn_id, + id.clone(), + tool.clone(), + ToolItemOutcome { + terminal_status, + failure_kind, + execution_duration_ms: option_i64_to_u64(*duration_ms), + }, + context, + ); + Some(TrackEventRequest::DynamicToolCall( + CodexDynamicToolCallEventRequest { + event_type: "codex_dynamic_tool_call_event", + event_params: CodexDynamicToolCallEventParams { + base, + dynamic_tool_name: tool.clone(), + success: *success, + output_content_item_count: counts.map(|counts| counts.total), + output_text_item_count: counts.map(|counts| counts.text), + output_image_item_count: counts.map(|counts| counts.image), + }, + }, + )) + } + ThreadItem::CollabAgentToolCall { + id, + tool, + status, + sender_thread_id, + receiver_thread_ids, + model, + reasoning_effort, + agents_states, + .. + } => { + let (terminal_status, failure_kind) = collab_tool_call_outcome(status)?; + let base = tool_item_base( + thread_id, + turn_id, + id.clone(), + collab_agent_tool_name(tool).to_string(), + ToolItemOutcome { + terminal_status, + failure_kind, + execution_duration_ms: None, + }, + context, + ); + Some(TrackEventRequest::CollabAgentToolCall( + CodexCollabAgentToolCallEventRequest { + event_type: "codex_collab_agent_tool_call_event", + event_params: CodexCollabAgentToolCallEventParams { + base, + sender_thread_id: sender_thread_id.clone(), + receiver_thread_count: usize_to_u64(receiver_thread_ids.len()), + receiver_thread_ids: Some(receiver_thread_ids.clone()), + requested_model: model.clone(), + requested_reasoning_effort: reasoning_effort + .as_ref() + .and_then(serialize_enum_as_string), + agent_state_count: Some(usize_to_u64(agents_states.len())), + completed_agent_count: Some(usize_to_u64( + agents_states + .values() + .filter(|state| state.status == CollabAgentStatus::Completed) + .count(), + )), + failed_agent_count: Some(usize_to_u64( + agents_states + .values() + .filter(|state| { + matches!( + state.status, + CollabAgentStatus::Errored + | CollabAgentStatus::Shutdown + | CollabAgentStatus::NotFound + ) + }) + .count(), + )), + }, + }, + )) + } + ThreadItem::WebSearch { id, query, action } => { + let base = tool_item_base( + thread_id, + turn_id, + id.clone(), + "web_search".to_string(), + ToolItemOutcome { + terminal_status: ToolItemTerminalStatus::Completed, + failure_kind: None, + execution_duration_ms: None, + }, + context, + ); + Some(TrackEventRequest::WebSearch(CodexWebSearchEventRequest { + event_type: "codex_web_search_event", + event_params: CodexWebSearchEventParams { + base, + web_search_action: action.as_ref().map(web_search_action_kind), + query_present: !query.trim().is_empty(), + query_count: web_search_query_count(query, action.as_ref()), + }, + })) + } + ThreadItem::ImageGeneration { + id, + status, + revised_prompt, + saved_path, + .. + } => { + let (terminal_status, failure_kind) = image_generation_outcome(status.as_str()); + let base = tool_item_base( + thread_id, + turn_id, + id.clone(), + "image_generation".to_string(), + ToolItemOutcome { + terminal_status, + failure_kind, + execution_duration_ms: None, + }, + context, + ); + Some(TrackEventRequest::ImageGeneration( + CodexImageGenerationEventRequest { + event_type: "codex_image_generation_event", + event_params: CodexImageGenerationEventParams { + base, + revised_prompt_present: revised_prompt.is_some(), + saved_path_present: saved_path.is_some(), + }, + }, + )) + } + _ => None, + } +} + +struct ToolItemOutcome { + terminal_status: ToolItemTerminalStatus, + failure_kind: Option, + execution_duration_ms: Option, +} + +#[derive(Default)] +struct CommandActionCounts { + total: u64, + read: u64, + list_files: u64, + search: u64, + unknown: u64, +} + +fn command_action_counts(command_actions: &[CommandAction]) -> CommandActionCounts { + let mut counts = CommandActionCounts { + total: usize_to_u64(command_actions.len()), + ..Default::default() + }; + for action in command_actions { + match action { + CommandAction::Read { .. } => counts.read += 1, + CommandAction::ListFiles { .. } => counts.list_files += 1, + CommandAction::Search { .. } => counts.search += 1, + CommandAction::Unknown { .. } => counts.unknown += 1, + } + } + counts +} + +#[derive(Clone, Copy)] +struct ToolItemContext<'a> { + started_at_ms: u64, + completed_at_ms: u64, + connection_state: &'a ConnectionState, + thread_metadata: &'a ThreadMetadataState, +} + +fn tool_item_base( + thread_id: &str, + turn_id: &str, + item_id: String, + tool_name: String, + outcome: ToolItemOutcome, + context: ToolItemContext<'_>, +) -> CodexToolItemEventBase { + let thread_metadata = context.thread_metadata; + CodexToolItemEventBase { + thread_id: thread_id.to_string(), + turn_id: turn_id.to_string(), + item_id, + app_server_client: context.connection_state.app_server_client.clone(), + runtime: context.connection_state.runtime.clone(), + thread_source: thread_metadata.thread_source.map(ThreadSource::as_str), + subagent_source: thread_metadata.subagent_source.clone(), + parent_thread_id: thread_metadata.parent_thread_id.clone(), + tool_name, + started_at_ms: context.started_at_ms, + completed_at_ms: context.completed_at_ms, + // duration_ms reflects item lifecycle observed by app-server. For web + // search and image generation in particular, that can be narrower than + // full upstream execution time. + duration_ms: observed_duration_ms(context.started_at_ms, context.completed_at_ms), + execution_duration_ms: outcome.execution_duration_ms, + review_count: 0, + guardian_review_count: 0, + user_review_count: 0, + final_approval_outcome: ToolItemFinalApprovalOutcome::Unknown, + terminal_status: outcome.terminal_status, + failure_kind: outcome.failure_kind, + requested_additional_permissions: false, + requested_network_access: false, + } +} + +fn observed_duration_ms(started_at_ms: u64, completed_at_ms: u64) -> Option { + completed_at_ms.checked_sub(started_at_ms) +} + +fn command_execution_tool_name(source: CommandExecutionSource) -> &'static str { + match source { + CommandExecutionSource::UnifiedExecStartup + | CommandExecutionSource::UnifiedExecInteraction => "unified_exec", + CommandExecutionSource::UserShell => "user_shell", + CommandExecutionSource::Agent => "shell", + } +} + +fn command_execution_outcome( + status: &CommandExecutionStatus, +) -> Option<(ToolItemTerminalStatus, Option)> { + match status { + CommandExecutionStatus::InProgress => None, + CommandExecutionStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)), + CommandExecutionStatus::Failed => Some(( + ToolItemTerminalStatus::Failed, + Some(ToolItemFailureKind::ToolError), + )), + CommandExecutionStatus::Declined => Some(( + ToolItemTerminalStatus::Rejected, + Some(ToolItemFailureKind::ApprovalDenied), + )), + } +} + +fn patch_apply_outcome( + status: &PatchApplyStatus, +) -> Option<(ToolItemTerminalStatus, Option)> { + match status { + PatchApplyStatus::InProgress => None, + PatchApplyStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)), + PatchApplyStatus::Failed => Some(( + ToolItemTerminalStatus::Failed, + Some(ToolItemFailureKind::ToolError), + )), + PatchApplyStatus::Declined => Some(( + ToolItemTerminalStatus::Rejected, + Some(ToolItemFailureKind::ApprovalDenied), + )), + } +} + +fn mcp_tool_call_outcome( + status: &McpToolCallStatus, +) -> Option<(ToolItemTerminalStatus, Option)> { + match status { + McpToolCallStatus::InProgress => None, + McpToolCallStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)), + McpToolCallStatus::Failed => Some(( + ToolItemTerminalStatus::Failed, + Some(ToolItemFailureKind::ToolError), + )), + } +} + +fn dynamic_tool_call_outcome( + status: &DynamicToolCallStatus, +) -> Option<(ToolItemTerminalStatus, Option)> { + match status { + DynamicToolCallStatus::InProgress => None, + DynamicToolCallStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)), + DynamicToolCallStatus::Failed => Some(( + ToolItemTerminalStatus::Failed, + Some(ToolItemFailureKind::ToolError), + )), + } +} + +fn collab_tool_call_outcome( + status: &CollabAgentToolCallStatus, +) -> Option<(ToolItemTerminalStatus, Option)> { + match status { + CollabAgentToolCallStatus::InProgress => None, + CollabAgentToolCallStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)), + CollabAgentToolCallStatus::Failed => Some(( + ToolItemTerminalStatus::Failed, + Some(ToolItemFailureKind::ToolError), + )), + } +} + +fn image_generation_outcome(status: &str) -> (ToolItemTerminalStatus, Option) { + match status { + "failed" | "error" => ( + ToolItemTerminalStatus::Failed, + Some(ToolItemFailureKind::ToolError), + ), + _ => (ToolItemTerminalStatus::Completed, None), + } +} + +fn collab_agent_tool_name(tool: &CollabAgentTool) -> &'static str { + match tool { + CollabAgentTool::SpawnAgent => "spawn_agent", + CollabAgentTool::SendInput => "send_input", + CollabAgentTool::ResumeAgent => "resume_agent", + CollabAgentTool::Wait => "wait_agent", + CollabAgentTool::CloseAgent => "close_agent", + } +} + +#[derive(Default)] +struct FileChangeCounts { + add: u64, + update: u64, + delete: u64, + move_: u64, +} + +fn file_change_counts(changes: &[codex_app_server_protocol::FileUpdateChange]) -> FileChangeCounts { + let mut counts = FileChangeCounts::default(); + for change in changes { + match &change.kind { + PatchChangeKind::Add => counts.add += 1, + PatchChangeKind::Delete => counts.delete += 1, + PatchChangeKind::Update { move_path: Some(_) } => counts.move_ += 1, + PatchChangeKind::Update { move_path: None } => counts.update += 1, + } + } + counts +} + +#[derive(Clone, Copy)] +struct DynamicContentCounts { + total: u64, + text: u64, + image: u64, +} + +fn dynamic_content_counts(items: &[DynamicToolCallOutputContentItem]) -> DynamicContentCounts { + let mut text = 0; + let mut image = 0; + for item in items { + match item { + DynamicToolCallOutputContentItem::InputText { .. } => text += 1, + DynamicToolCallOutputContentItem::InputImage { .. } => image += 1, + } + } + DynamicContentCounts { + total: usize_to_u64(items.len()), + text, + image, + } +} + +fn web_search_action_kind(action: &WebSearchAction) -> WebSearchActionKind { + match action { + WebSearchAction::Search { .. } => WebSearchActionKind::Search, + WebSearchAction::OpenPage { .. } => WebSearchActionKind::OpenPage, + WebSearchAction::FindInPage { .. } => WebSearchActionKind::FindInPage, + WebSearchAction::Other => WebSearchActionKind::Other, + } +} + +fn web_search_query_count(query: &str, action: Option<&WebSearchAction>) -> Option { + match action { + Some(WebSearchAction::Search { query, queries }) => queries + .as_ref() + .map(|queries| usize_to_u64(queries.len())) + .or_else(|| query.as_ref().map(|_| 1)), + Some(WebSearchAction::OpenPage { .. }) + | Some(WebSearchAction::FindInPage { .. }) + | Some(WebSearchAction::Other) => None, + None => (!query.trim().is_empty()).then_some(1), + } +} + fn codex_turn_event_params( app_server_client: CodexAppServerClientMetadata, runtime: CodexRuntimeMetadata, diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index e2d8fd587d..d9ff9d1551 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -10,7 +10,6 @@ use crate::thread_state::TurnSummary; use crate::thread_state::resolve_server_request_on_thread_listener; use crate::thread_status::ThreadWatchActiveGuard; use crate::thread_status::ThreadWatchManager; -use codex_analytics::AnalyticsEventsClient; use codex_app_server_protocol::AccountRateLimitsUpdatedNotification; use codex_app_server_protocol::AdditionalPermissionProfile as V2AdditionalPermissionProfile; use codex_app_server_protocol::CodexErrorInfo as V2CodexErrorInfo; @@ -137,7 +136,6 @@ pub(crate) async fn apply_bespoke_event_handling( conversation_id: ThreadId, conversation: Arc, thread_manager: Arc, - analytics_events_client: Option, outgoing: ThreadScopedOutgoingMessageSender, thread_state: Arc>, thread_watch_manager: ThreadWatchManager, @@ -175,10 +173,6 @@ pub(crate) async fn apply_bespoke_event_handling( thread_id: conversation_id.to_string(), turn, }; - if let Some(analytics_events_client) = analytics_events_client.as_ref() { - analytics_events_client - .track_notification(ServerNotification::TurnStarted(notification.clone())); - } outgoing .send_server_notification(ServerNotification::TurnStarted(notification)) .await; @@ -195,7 +189,6 @@ pub(crate) async fn apply_bespoke_event_handling( conversation_id, event_turn_id, turn_complete_event, - analytics_events_client.as_ref(), &outgoing, &thread_state, ) @@ -237,10 +230,6 @@ pub(crate) async fn apply_bespoke_event_handling( thread_id: Some(conversation_id.to_string()), message: warning_event.message, }; - if let Some(analytics_events_client) = analytics_events_client.as_ref() { - analytics_events_client - .track_notification(ServerNotification::Warning(notification.clone())); - } outgoing .send_server_notification(ServerNotification::Warning(notification)) .await; @@ -250,10 +239,6 @@ pub(crate) async fn apply_bespoke_event_handling( thread_id: conversation_id.to_string(), message: warning_event.message, }; - if let Some(analytics_events_client) = analytics_events_client.as_ref() { - analytics_events_client - .track_notification(ServerNotification::GuardianWarning(notification.clone())); - } outgoing .send_server_notification(ServerNotification::GuardianWarning(notification)) .await; @@ -1138,7 +1123,6 @@ pub(crate) async fn apply_bespoke_event_handling( conversation_id, event_turn_id, turn_aborted_event, - analytics_events_client.as_ref(), &outgoing, &thread_state, ) @@ -1291,7 +1275,6 @@ async fn emit_turn_completed_with_status( conversation_id: ThreadId, event_turn_id: String, turn_completion_metadata: TurnCompletionMetadata, - analytics_events_client: Option<&AnalyticsEventsClient>, outgoing: &ThreadScopedOutgoingMessageSender, ) { let notification = TurnCompletedNotification { @@ -1307,10 +1290,6 @@ async fn emit_turn_completed_with_status( duration_ms: turn_completion_metadata.duration_ms, }, }; - if let Some(analytics_events_client) = analytics_events_client { - analytics_events_client - .track_notification(ServerNotification::TurnCompleted(notification.clone())); - } outgoing .send_server_notification(ServerNotification::TurnCompleted(notification)) .await; @@ -1474,7 +1453,6 @@ async fn handle_turn_complete( conversation_id: ThreadId, event_turn_id: String, turn_complete_event: TurnCompleteEvent, - analytics_events_client: Option<&AnalyticsEventsClient>, outgoing: &ThreadScopedOutgoingMessageSender, thread_state: &Arc>, ) { @@ -1495,7 +1473,6 @@ async fn handle_turn_complete( completed_at: turn_complete_event.completed_at, duration_ms: turn_complete_event.duration_ms, }, - analytics_events_client, outgoing, ) .await; @@ -1505,7 +1482,6 @@ async fn handle_turn_interrupted( conversation_id: ThreadId, event_turn_id: String, turn_aborted_event: TurnAbortedEvent, - analytics_events_client: Option<&AnalyticsEventsClient>, outgoing: &ThreadScopedOutgoingMessageSender, thread_state: &Arc>, ) { @@ -1521,7 +1497,6 @@ async fn handle_turn_interrupted( completed_at: turn_aborted_event.completed_at, duration_ms: turn_aborted_event.duration_ms, }, - analytics_events_client, outgoing, ) .await; @@ -2095,7 +2070,6 @@ mod tests { use codex_app_server_protocol::GuardianApprovalReviewStatus; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::TurnPlanStepStatus; - use codex_login::AuthManager; use codex_login::CodexAuth; use codex_protocol::items::HookPromptFragment; use codex_protocol::items::build_hook_prompt_message; @@ -2301,7 +2275,6 @@ mod tests { outgoing: ThreadScopedOutgoingMessageSender, thread_state: Arc>, thread_watch_manager: ThreadWatchManager, - analytics_events_client: AnalyticsEventsClient, } impl GuardianAssessmentTestContext { @@ -2315,7 +2288,6 @@ mod tests { self.conversation_id, self.conversation.clone(), self.thread_manager.clone(), - Some(self.analytics_events_client.clone()), self.outgoing.clone(), self.thread_state.clone(), self.thread_watch_manager.clone(), @@ -2649,13 +2621,6 @@ mod tests { outgoing: outgoing.clone(), thread_state: thread_state.clone(), thread_watch_manager: thread_watch_manager.clone(), - analytics_events_client: AnalyticsEventsClient::new( - AuthManager::from_auth_for_testing( - CodexAuth::create_dummy_chatgpt_auth_for_testing(), - ), - "http://localhost".to_string(), - Some(false), - ), }; guardian_context @@ -3263,7 +3228,6 @@ mod tests { conversation_id, conversation, thread_manager, - /*analytics_events_client*/ None, outgoing, thread_state, thread_watch_manager, @@ -3320,7 +3284,6 @@ mod tests { conversation_id, event_turn_id.clone(), turn_complete_event(&event_turn_id), - /*analytics_events_client*/ None, &outgoing, &thread_state, ) @@ -3374,7 +3337,6 @@ mod tests { conversation_id, event_turn_id.clone(), turn_aborted_event(&event_turn_id), - /*analytics_events_client*/ None, &outgoing, &thread_state, ) @@ -3425,7 +3387,6 @@ mod tests { conversation_id, event_turn_id.clone(), turn_complete_event(&event_turn_id), - /*analytics_events_client*/ None, &outgoing, &thread_state, ) @@ -3660,7 +3621,6 @@ mod tests { conversation_a, a_turn1.clone(), turn_complete_event(&a_turn1), - /*analytics_events_client*/ None, &outgoing, &thread_state, ) @@ -3682,7 +3642,6 @@ mod tests { conversation_b, b_turn1.clone(), turn_complete_event(&b_turn1), - /*analytics_events_client*/ None, &outgoing, &thread_state, ) @@ -3694,7 +3653,6 @@ mod tests { conversation_a, a_turn2.clone(), turn_complete_event(&a_turn2), - /*analytics_events_client*/ None, &outgoing, &thread_state, ) diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index a3c3877fdc..1ac635685f 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -391,7 +391,6 @@ impl MessageProcessor { auth_manager.clone(), Arc::clone(&thread_manager), outgoing.clone(), - analytics_events_client.clone(), arg0_paths.clone(), Arc::clone(&config), config_manager.clone(), diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 2807d2532e..a7420f8c78 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -140,6 +140,9 @@ impl ThreadScopedOutgoingMessageSender { } pub(crate) async fn send_server_notification(&self, notification: ServerNotification) { + self.outgoing + .analytics_events_client + .track_notification(notification.clone()); if self.connection_ids.is_empty() { return; } @@ -526,7 +529,7 @@ impl OutgoingMessageSender { targeted_connections = connection_ids.len(), "app-server event: {notification}" ); - let outgoing_message = OutgoingMessage::AppServerNotification(notification); + let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone()); if connection_ids.is_empty() { if let Err(err) = self .sender @@ -560,7 +563,7 @@ impl OutgoingMessageSender { notification: ServerNotification, ) { tracing::trace!("app-server event: {notification}"); - let outgoing_message = OutgoingMessage::AppServerNotification(notification); + let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone()); let (write_complete_tx, write_complete_rx) = oneshot::channel(); if let Err(err) = self .sender diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index 81e09c7d5c..ef44a2b178 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -8,7 +8,6 @@ pub(super) struct ListenerTaskContext { pub(super) thread_state_manager: ThreadStateManager, pub(super) outgoing: Arc, pub(super) pending_thread_unloads: Arc>>, - pub(super) analytics_events_client: AnalyticsEventsClient, pub(super) thread_watch_manager: ThreadWatchManager, pub(super) thread_list_state_permit: Arc, pub(super) fallback_model_provider: String, @@ -239,7 +238,6 @@ pub(super) async fn ensure_listener_task_running( thread_manager, thread_state_manager, pending_thread_unloads, - analytics_events_client, thread_watch_manager, thread_list_state_permit, fallback_model_provider, @@ -315,7 +313,6 @@ pub(super) async fn ensure_listener_task_running( conversation_id, conversation.clone(), thread_manager.clone(), - Some(analytics_events_client.clone()), thread_outgoing, thread_state.clone(), thread_watch_manager.clone(), diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 783e53ccb0..4042197a76 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -305,7 +305,6 @@ pub(crate) struct ThreadRequestProcessor { pub(super) auth_manager: Arc, pub(super) thread_manager: Arc, pub(super) outgoing: Arc, - pub(super) analytics_events_client: AnalyticsEventsClient, pub(super) arg0_paths: Arg0DispatchPaths, pub(super) config: Arc, pub(super) config_manager: ConfigManager, @@ -325,7 +324,6 @@ impl ThreadRequestProcessor { auth_manager: Arc, thread_manager: Arc, outgoing: Arc, - analytics_events_client: AnalyticsEventsClient, arg0_paths: Arg0DispatchPaths, config: Arc, config_manager: ConfigManager, @@ -341,7 +339,6 @@ impl ThreadRequestProcessor { auth_manager, thread_manager, outgoing, - analytics_events_client, arg0_paths, config, config_manager, @@ -741,7 +738,6 @@ impl ThreadRequestProcessor { thread_state_manager: self.thread_state_manager.clone(), outgoing: Arc::clone(&self.outgoing), pending_thread_unloads: Arc::clone(&self.pending_thread_unloads), - analytics_events_client: self.analytics_events_client.clone(), thread_watch_manager: self.thread_watch_manager.clone(), thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), @@ -839,7 +835,6 @@ impl ThreadRequestProcessor { thread_state_manager: self.thread_state_manager.clone(), outgoing: Arc::clone(&self.outgoing), pending_thread_unloads: Arc::clone(&self.pending_thread_unloads), - analytics_events_client: self.analytics_events_client.clone(), thread_watch_manager: self.thread_watch_manager.clone(), thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), diff --git a/codex-rs/app-server/src/request_processors/turn_processor.rs b/codex-rs/app-server/src/request_processors/turn_processor.rs index b207fde940..bdc5847b0d 100644 --- a/codex-rs/app-server/src/request_processors/turn_processor.rs +++ b/codex-rs/app-server/src/request_processors/turn_processor.rs @@ -1083,7 +1083,6 @@ impl TurnRequestProcessor { thread_state_manager: self.thread_state_manager.clone(), outgoing: Arc::clone(&self.outgoing), pending_thread_unloads: Arc::clone(&self.pending_thread_unloads), - analytics_events_client: self.analytics_events_client.clone(), thread_watch_manager: self.thread_watch_manager.clone(), thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(),