diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 8d67b2f2c0..9a8fd367b5 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -432,7 +432,7 @@ async fn ingest_rejected_turn_steer( .await; reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(4), request: Box::new(sample_turn_steer_request( @@ -492,7 +492,7 @@ async fn ingest_turn_prerequisites( ingest_initialize(reducer, out).await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_start_response( "thread-2", /*ephemeral*/ false, "gpt-5", @@ -506,7 +506,7 @@ async fn ingest_turn_prerequisites( reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(3), request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)), @@ -516,7 +516,7 @@ async fn ingest_turn_prerequisites( .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)), }, @@ -899,7 +899,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_start_response( "thread-no-client", @@ -943,7 +943,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_resume_response( "thread-1", /*ephemeral*/ true, "gpt-5", @@ -998,7 +998,7 @@ async fn unrelated_client_requests_are_ignored_by_reducer() { reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(3), request: Box::new(ClientRequest::ThreadArchive { @@ -1013,7 +1013,7 @@ async fn unrelated_client_requests_are_ignored_by_reducer() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)), }, @@ -1035,7 +1035,7 @@ async fn unrelated_client_responses_are_ignored_by_reducer() { ingest_initialize(&mut reducer, &mut events).await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(ClientResponse::ThreadArchive { request_id: RequestId::Integer(9), @@ -1081,7 +1081,7 @@ async fn compaction_event_ingests_custom_fact() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_resume_response_with_source( "thread-1", @@ -1192,7 +1192,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_start_response( "thread-guardian", @@ -1962,7 +1962,7 @@ async fn accepted_turn_steer_emits_expected_event() { .await; reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(4), request: Box::new(sample_turn_steer_request( @@ -1974,7 +1974,7 @@ async fn accepted_turn_steer_emits_expected_event() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)), }, @@ -2116,7 +2116,7 @@ async fn turn_start_error_response_discards_pending_start_request() { ingest_initialize(&mut reducer, &mut out).await; reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(3), request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)), @@ -2140,7 +2140,7 @@ async fn turn_start_error_response_discards_pending_start_request() { // failed turn/start request and attach request-scoped connection metadata. reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)), }, @@ -2257,7 +2257,7 @@ async fn accepted_steers_increment_turn_steer_count() { reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(4), request: Box::new(sample_turn_steer_request( @@ -2269,7 +2269,7 @@ async fn accepted_steers_increment_turn_steer_count() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)), }, @@ -2279,7 +2279,7 @@ async fn accepted_steers_increment_turn_steer_count() { reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(5), request: Box::new(sample_turn_steer_request( @@ -2303,7 +2303,7 @@ async fn accepted_steers_increment_turn_steer_count() { reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(6), request: Box::new(sample_turn_steer_request( @@ -2315,7 +2315,7 @@ async fn accepted_steers_increment_turn_steer_count() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 6)), }, diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index e836f466a0..4470696a39 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -28,13 +28,14 @@ use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ServerResponse; use codex_login::AuthManager; use codex_login::default_client::create_client; use codex_plugin::PluginTelemetryMetadata; use std::collections::HashSet; use std::sync::Arc; use std::sync::Mutex; -use std::sync::Weak; use std::time::Duration; use tokio::sync::mpsc; @@ -51,51 +52,32 @@ pub(crate) struct AnalyticsEventsQueue { #[derive(Clone)] pub struct AnalyticsEventsClient { - queue: AnalyticsEventsQueue, - analytics_enabled: Option, + queue: Option, } -#[derive(Clone, Copy, Debug, Eq, PartialEq)] +#[derive(Debug, Clone, Copy)] pub enum AuthManagerRetention { Strong, Weak, } -enum RetainedAuthManager { - Strong(Arc), - Weak(Weak), -} - -impl RetainedAuthManager { - fn upgrade(&self) -> Option> { - match self { - Self::Strong(auth_manager) => Some(Arc::clone(auth_manager)), - Self::Weak(auth_manager) => auth_manager.upgrade(), - } - } -} - impl AnalyticsEventsQueue { pub(crate) fn new( auth_manager: Arc, base_url: String, retention: AuthManagerRetention, ) -> Self { - let auth_manager = match retention { - AuthManagerRetention::Strong => RetainedAuthManager::Strong(auth_manager), - AuthManagerRetention::Weak => RetainedAuthManager::Weak(Arc::downgrade(&auth_manager)), - }; - Self::spawn(auth_manager, base_url) - } - - fn spawn(auth_manager: RetainedAuthManager, base_url: String) -> Self { let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE); + let auth_manager = match retention { + AuthManagerRetention::Strong => AuthManagerHandle::Strong(auth_manager), + AuthManagerRetention::Weak => AuthManagerHandle::Weak(Arc::downgrade(&auth_manager)), + }; tokio::spawn(async move { let mut reducer = AnalyticsReducer::default(); while let Some(input) = receiver.recv().await { let mut events = Vec::new(); reducer.ingest(input, &mut events).await; - let Some(auth_manager) = auth_manager.upgrade() else { + let Some(auth_manager) = auth_manager.get() else { break; }; send_track_events(&auth_manager, &base_url, events).await; @@ -149,29 +131,40 @@ impl AnalyticsEventsQueue { } } +enum AuthManagerHandle { + Strong(Arc), + Weak(std::sync::Weak), +} + +impl AuthManagerHandle { + fn get(&self) -> Option> { + match self { + Self::Strong(auth_manager) => Some(Arc::clone(auth_manager)), + Self::Weak(auth_manager) => auth_manager.upgrade(), + } + } +} + impl AnalyticsEventsClient { pub fn new( auth_manager: Arc, base_url: String, analytics_enabled: Option, - retention: AuthManagerRetention, + auth_manager_retention: AuthManagerRetention, ) -> Self { Self { - queue: AnalyticsEventsQueue::new(auth_manager, base_url, retention), - analytics_enabled, + queue: (analytics_enabled != Some(false)).then(|| { + AnalyticsEventsQueue::new( + Arc::clone(&auth_manager), + base_url, + auth_manager_retention, + ) + }), } } pub fn disabled() -> Self { - let (sender, _receiver) = mpsc::channel(1); - Self { - queue: AnalyticsEventsQueue { - sender, - app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())), - plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())), - }, - analytics_enabled: Some(false), - } + Self { queue: None } } pub fn track_skill_invocations( @@ -235,7 +228,7 @@ impl AnalyticsEventsClient { if !tracks_client_request(&request) { return; } - self.record_fact(AnalyticsFact::Request { + self.record_fact(AnalyticsFact::ClientRequest { connection_id, request_id, request: Box::new(request), @@ -243,7 +236,10 @@ impl AnalyticsEventsClient { } pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) { - if !self.queue.should_enqueue_app_used(&tracking, &app) { + let Some(queue) = self.queue.as_ref() else { + return; + }; + if !queue.should_enqueue_app_used(&tracking, &app) { return; } self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed( @@ -258,7 +254,10 @@ impl AnalyticsEventsClient { } pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) { - if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) { + let Some(queue) = self.queue.as_ref() else { + return; + }; + if !queue.should_enqueue_plugin_used(&tracking, &plugin) { return; } self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed( @@ -321,17 +320,16 @@ impl AnalyticsEventsClient { } pub(crate) fn record_fact(&self, input: AnalyticsFact) { - if self.analytics_enabled == Some(false) { - return; + if let Some(queue) = self.queue.as_ref() { + queue.try_send(input); } - self.queue.try_send(input); } pub fn track_response(&self, connection_id: u64, response: ClientResponse) { if !tracks_client_response(&response) { return; } - self.record_fact(AnalyticsFact::Response { + self.record_fact(AnalyticsFact::ClientResponse { connection_id, response: Box::new(response), }); @@ -371,6 +369,20 @@ impl AnalyticsEventsClient { pub fn track_notification(&self, notification: ServerNotification) { self.record_fact(AnalyticsFact::Notification(Box::new(notification))); } + + pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) { + self.record_fact(AnalyticsFact::ServerRequest { + connection_id, + request: Box::new(request), + }); + } + + pub fn track_server_response(&self, connection_id: u64, response: ServerResponse) { + self.record_fact(AnalyticsFact::ServerResponse { + connection_id, + response: Box::new(response), + }); + } } fn tracks_client_request(request: &ClientRequest) -> bool { diff --git a/codex-rs/analytics/src/facts.rs b/codex-rs/analytics/src/facts.rs index 26fc7b295a..1ac212a9d5 100644 --- a/codex-rs/analytics/src/facts.rs +++ b/codex-rs/analytics/src/facts.rs @@ -8,6 +8,8 @@ use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ServerResponse; use codex_plugin::PluginTelemetryMetadata; use codex_protocol::config_types::ApprovalsReviewer; use codex_protocol::config_types::ModeKind; @@ -273,12 +275,12 @@ pub(crate) enum AnalyticsFact { runtime: CodexRuntimeMetadata, rpc_transport: AppServerRpcTransport, }, - Request { + ClientRequest { connection_id: u64, request_id: RequestId, request: Box, }, - Response { + ClientResponse { connection_id: u64, response: Box, }, @@ -293,6 +295,14 @@ pub(crate) enum AnalyticsFact { error: JSONRPCErrorError, error_type: Option, }, + ServerRequest { + connection_id: u64, + request: Box, + }, + ServerResponse { + connection_id: u64, + response: Box, + }, Notification(Box), // Facts that do not naturally exist on the app-server protocol surface, or // would require non-trivial protocol reshaping on this branch. diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index 37760179c6..9d6222d42b 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -171,14 +171,14 @@ impl AnalyticsReducer { rpc_transport, ); } - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id, request_id, request, } => { self.ingest_request(connection_id, request_id, *request); } - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id, response, } => { @@ -204,6 +204,14 @@ impl AnalyticsReducer { AnalyticsFact::Notification(notification) => { self.ingest_notification(*notification, out); } + AnalyticsFact::ServerRequest { + connection_id: _connection_id, + request: _request, + } => {} + AnalyticsFact::ServerResponse { + connection_id: _connection_id, + response: _response, + } => {} AnalyticsFact::Custom(input) => match input { CustomAnalyticsFact::SubAgentThreadStarted(input) => { self.ingest_subagent_thread_started(input, out); diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index f5afe91bb2..d67f05eafa 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -803,6 +803,24 @@ macro_rules! server_request_definitions { $(Self::$variant { request_id, .. } => request_id,)* } } + + pub fn response_from_result( + &self, + result: &crate::Result, + ) -> serde_json::Result { + match self { + $( + Self::$variant { request_id, .. } => { + let response = + <$response as serde::Deserialize>::deserialize(result)?; + Ok(ServerResponse::$variant { + request_id: request_id.clone(), + response, + }) + } + )* + } + } } /// Typed response from the client to the server. diff --git a/codex-rs/app-server/src/analytics_events.rs b/codex-rs/app-server/src/analytics_events.rs new file mode 100644 index 0000000000..c41bfc8b31 --- /dev/null +++ b/codex-rs/app-server/src/analytics_events.rs @@ -0,0 +1,18 @@ +use std::sync::Arc; + +use codex_analytics::AnalyticsEventsClient; +use codex_analytics::AuthManagerRetention; +use codex_core::config::Config; +use codex_login::AuthManager; + +pub(crate) fn analytics_events_client_from_config( + auth_manager: Arc, + config: &Config, +) -> AnalyticsEventsClient { + AnalyticsEventsClient::new( + auth_manager, + config.chatgpt_base_url.trim_end_matches('/').to_string(), + config.analytics_enabled, + AuthManagerRetention::Weak, + ) +} diff --git a/codex-rs/app-server/src/in_process.rs b/codex-rs/app-server/src/in_process.rs index 13c634991c..27ac7453a5 100644 --- a/codex-rs/app-server/src/in_process.rs +++ b/codex-rs/app-server/src/in_process.rs @@ -50,6 +50,7 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; +use crate::analytics_events::analytics_events_client_from_config; use crate::config_manager::ConfigManager; use crate::error_code::INTERNAL_ERROR_CODE; use crate::error_code::INVALID_REQUEST_ERROR_CODE; @@ -66,9 +67,7 @@ use crate::transport::CHANNEL_CAPACITY; use crate::transport::ConnectionOrigin; use crate::transport::OutboundConnectionState; use crate::transport::route_outgoing_envelope; -use codex_analytics::AnalyticsEventsClient; use codex_analytics::AppServerRpcTransport; -use codex_analytics::AuthManagerRetention; use codex_app_server_protocol::ClientNotification; use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::ConfigWarningNotification; @@ -370,18 +369,11 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { let auth_manager = AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env) .await; - let analytics_events_client = AnalyticsEventsClient::new( - Arc::clone(&auth_manager), - args.config - .chatgpt_base_url - .trim_end_matches('/') - .to_string(), - args.config.analytics_enabled, - AuthManagerRetention::Weak, - ); + let analytics_events_client = + analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref()); let outgoing_message_sender = Arc::new(OutgoingMessageSender::new( outgoing_tx, - analytics_events_client, + analytics_events_client.clone(), )); let (writer_tx, mut writer_rx) = mpsc::channel::(channel_capacity); @@ -419,6 +411,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { let mut processor_handle = tokio::spawn(async move { let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs { outgoing: Arc::clone(&processor_outgoing), + analytics_events_client, arg0_paths: args.arg0_paths, config: args.config, config_manager, @@ -577,7 +570,11 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { } Some(InProcessClientMessage::ServerRequestResponse { request_id, result }) => { outgoing_message_sender - .notify_client_response(request_id, result) + .notify_client_response( + IN_PROCESS_CONNECTION_ID, + request_id, + result, + ) .await; } Some(InProcessClientMessage::ServerRequestError { request_id, error }) => { diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 6b63a0114e..3aecfd6120 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -1,7 +1,5 @@ #![deny(clippy::print_stdout, clippy::print_stderr)] -use codex_analytics::AnalyticsEventsClient; -use codex_analytics::AuthManagerRetention; use codex_arg0::Arg0DispatchPaths; use codex_config::ConfigLayerStackOrdering; use codex_config::LoaderOverrides; @@ -21,6 +19,7 @@ use std::sync::Arc; use std::sync::RwLock; use std::sync::atomic::AtomicBool; +use crate::analytics_events::analytics_events_client_from_config; use crate::config_manager::ConfigManager; use crate::message_processor::MessageProcessor; use crate::message_processor::MessageProcessorArgs; @@ -69,6 +68,7 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::registry::Registry; use tracing_subscriber::util::SubscriberInitExt; +mod analytics_events; mod app_server_tracing; mod bespoke_event_handling; mod codex_message_processor; @@ -711,19 +711,18 @@ pub async fn run_main_with_transport_options( }); let processor_handle = tokio::spawn({ - let analytics_events_client = AnalyticsEventsClient::new( - Arc::clone(&auth_manager), - config.chatgpt_base_url.trim_end_matches('/').to_string(), - config.analytics_enabled, - AuthManagerRetention::Weak, - ); + let outbound_control_tx = outbound_control_tx; + let auth_manager = + AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await; + let analytics_events_client = + analytics_events_client_from_config(Arc::clone(&auth_manager), &config); let outgoing_message_sender = Arc::new(OutgoingMessageSender::new( outgoing_tx, - analytics_events_client, + analytics_events_client.clone(), )); - let outbound_control_tx = outbound_control_tx; let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs { outgoing: outgoing_message_sender, + analytics_events_client, arg0_paths, config: Arc::new(config), config_manager, @@ -732,7 +731,7 @@ pub async fn run_main_with_transport_options( log_db, config_warnings, session_source, - auth_manager: Arc::clone(&auth_manager), + auth_manager, rpc_transport: analytics_rpc_transport(&transport), remote_control_handle: Some(remote_control_handle), plugin_startup_tasks: runtime_options.plugin_startup_tasks, @@ -890,7 +889,7 @@ pub async fn run_main_with_transport_options( warn!("dropping response from unknown connection: {connection_id:?}"); continue; } - processor.process_response(response).await; + processor.process_response(connection_id, response).await; } JSONRPCMessage::Notification(notification) => { if !connections.contains_key(&connection_id) { diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index e9b473c9f3..49f88f1824 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -25,7 +25,6 @@ use async_trait::async_trait; use axum::http::HeaderValue; use codex_analytics::AnalyticsEventsClient; use codex_analytics::AppServerRpcTransport; -use codex_analytics::AuthManagerRetention; use codex_app_server_protocol::AppListUpdatedNotification; use codex_app_server_protocol::AuthMode as LoginAuthMode; use codex_app_server_protocol::ChatgptAuthTokensRefreshParams; @@ -239,6 +238,7 @@ impl ConnectionSessionState { pub(crate) struct MessageProcessorArgs { pub(crate) outgoing: Arc, + pub(crate) analytics_events_client: AnalyticsEventsClient, pub(crate) arg0_paths: Arg0DispatchPaths, pub(crate) config: Arc, pub(crate) config_manager: ConfigManager, @@ -259,6 +259,7 @@ impl MessageProcessor { pub(crate) fn new(args: MessageProcessorArgs) -> Self { let MessageProcessorArgs { outgoing, + analytics_events_client, arg0_paths, config, config_manager, @@ -275,12 +276,6 @@ impl MessageProcessor { auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge { outgoing: outgoing.clone(), })); - let analytics_events_client = AnalyticsEventsClient::new( - Arc::clone(&auth_manager), - config.chatgpt_base_url.trim_end_matches('/').to_string(), - config.analytics_enabled, - AuthManagerRetention::Strong, - ); let thread_manager = Arc::new(ThreadManager::new( config.as_ref(), auth_manager.clone(), @@ -557,10 +552,16 @@ impl MessageProcessor { } /// Handle a standalone JSON-RPC response originating from the peer. - pub(crate) async fn process_response(&self, response: JSONRPCResponse) { + pub(crate) async fn process_response( + &self, + connection_id: ConnectionId, + response: JSONRPCResponse, + ) { tracing::info!("<- response: {:?}", response); let JSONRPCResponse { id, result, .. } = response; - self.outgoing.notify_client_response(id, result).await + self.outgoing + .notify_client_response(connection_id, id, result) + .await } /// Handle an error object received from the peer. diff --git a/codex-rs/app-server/src/message_processor/tracing_tests.rs b/codex-rs/app-server/src/message_processor/tracing_tests.rs index b7ea51bd59..5b1eba55a9 100644 --- a/codex-rs/app-server/src/message_processor/tracing_tests.rs +++ b/codex-rs/app-server/src/message_processor/tracing_tests.rs @@ -1,6 +1,7 @@ use super::ConnectionSessionState; use super::MessageProcessor; use super::MessageProcessorArgs; +use crate::analytics_events::analytics_events_client_from_config; use crate::config_manager::ConfigManager; use crate::outgoing_message::ConnectionId; use crate::outgoing_message::OutgoingMessageSender; @@ -264,13 +265,14 @@ async fn build_test_processor( mpsc::Receiver, ) { let (outgoing_tx, outgoing_rx) = mpsc::channel(16); - let analytics_events_client = codex_analytics::AnalyticsEventsClient::disabled(); + let auth_manager = + AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false).await; + let analytics_events_client = + analytics_events_client_from_config(Arc::clone(&auth_manager), config.as_ref()); let outgoing = Arc::new(OutgoingMessageSender::new( outgoing_tx, analytics_events_client.clone(), )); - let auth_manager = - AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false).await; let config_manager = ConfigManager::new( config.codex_home.to_path_buf(), Vec::new(), @@ -281,6 +283,7 @@ async fn build_test_processor( ); let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs { outgoing, + analytics_events_client, arg0_paths: Arg0DispatchPaths::default(), config, config_manager, diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 89dcda4d13..7906f3018e 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -134,6 +134,7 @@ struct PendingCallbackEntry { callback: oneshot::Sender, thread_id: Option, request: ServerRequest, + track_server_response: bool, } impl ThreadScopedOutgoingMessageSender { @@ -302,11 +303,12 @@ impl OutgoingMessageSender { callback: tx_approve, thread_id, request: request.clone(), + track_server_response: connection_ids.is_some(), }, ); } - let outgoing_message = OutgoingMessage::Request(request); + let outgoing_message = OutgoingMessage::Request(request.clone()); let send_result = match connection_ids { None => { self.sender @@ -329,6 +331,9 @@ impl OutgoingMessageSender { { send_error = Some(err); break; + } else { + self.analytics_events_client + .track_server_request(connection_id.0, request.clone()); } } match send_error { @@ -357,21 +362,35 @@ impl OutgoingMessageSender { .sender .send(OutgoingEnvelope::ToConnection { connection_id, - message: OutgoingMessage::Request(request), + message: OutgoingMessage::Request(request.clone()), write_complete_tx: None, }) .await { warn!("failed to resend request to client: {err:?}"); + } else { + self.analytics_events_client + .track_server_request(connection_id.0, request); } } } - pub(crate) async fn notify_client_response(&self, id: RequestId, result: Result) { + pub(crate) async fn notify_client_response( + &self, + connection_id: ConnectionId, + id: RequestId, + result: Result, + ) { let entry = self.take_request_callback(&id).await; match entry { Some((id, entry)) => { + if entry.track_server_response + && let Ok(response) = entry.request.response_from_result(&result) + { + self.analytics_events_client + .track_server_response(connection_id.0, response); + } if let Err(err) = entry.callback.send(Ok(result)) { warn!("could not notify callback for {id:?} due to: {err:?}"); } @@ -687,6 +706,8 @@ mod tests { use codex_app_server_protocol::AccountUpdatedNotification; use codex_app_server_protocol::ApplyPatchApprovalParams; use codex_app_server_protocol::AuthMode; + use codex_app_server_protocol::CommandExecutionApprovalDecision; + use codex_app_server_protocol::CommandExecutionRequestApprovalParams; use codex_app_server_protocol::ConfigWarningNotification; use codex_app_server_protocol::DynamicToolCallParams; use codex_app_server_protocol::FileChangeRequestApprovalParams; @@ -697,6 +718,7 @@ mod tests { use codex_app_server_protocol::ModelVerificationNotification; use codex_app_server_protocol::RateLimitSnapshot; use codex_app_server_protocol::RateLimitWindow; + use codex_app_server_protocol::ServerResponse; use codex_app_server_protocol::ToolRequestUserInputParams; use codex_protocol::ThreadId; use pretty_assertions::assert_eq; @@ -898,6 +920,47 @@ mod tests { ); } + #[test] + fn server_request_response_from_result_decodes_typed_response() { + let request = ServerRequest::CommandExecutionRequestApproval { + request_id: RequestId::Integer(7), + params: CommandExecutionRequestApprovalParams { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + item_id: "item-1".to_string(), + approval_id: None, + reason: None, + network_approval_context: None, + command: Some("echo hi".to_string()), + cwd: None, + command_actions: None, + additional_permissions: None, + proposed_execpolicy_amendment: None, + proposed_network_policy_amendments: None, + available_decisions: None, + }, + }; + + let response = request + .response_from_result(&json!({ + "decision": "acceptForSession", + })) + .expect("decode typed server response"); + + let ServerResponse::CommandExecutionRequestApproval { + request_id, + response, + } = response + else { + panic!("expected command execution approval response"); + }; + assert_eq!(request_id, RequestId::Integer(7)); + assert_eq!( + response.decision, + CommandExecutionApprovalDecision::AcceptForSession + ); + } + #[test] fn verify_model_verification_notification_serialization() { let notification = ServerNotification::ModelVerification(ModelVerificationNotification {