From 840f2711dffc4567d55556906b090d86934c9a10 Mon Sep 17 00:00:00 2001 From: rhan-oai Date: Mon, 20 Apr 2026 17:03:51 -0700 Subject: [PATCH 1/9] [codex-analytics] ingest server requests and responses --- .../analytics/src/analytics_client_tests.rs | 34 +++---- codex-rs/analytics/src/client.rs | 46 ++++++--- codex-rs/analytics/src/facts.rs | 14 ++- codex-rs/analytics/src/reducer.rs | 12 ++- .../src/protocol/common.rs | 17 ++++ codex-rs/app-server/src/analytics_events.rs | 16 ++++ .../app-server/src/bespoke_event_handling.rs | 65 ++++++++++--- .../app-server/src/codex_message_processor.rs | 5 +- codex-rs/app-server/src/command_exec.rs | 15 ++- codex-rs/app-server/src/fs_watch.rs | 5 +- codex-rs/app-server/src/in_process.rs | 19 +++- codex-rs/app-server/src/lib.rs | 12 ++- codex-rs/app-server/src/message_processor.rs | 17 ++-- .../src/message_processor/tracing_tests.rs | 9 +- codex-rs/app-server/src/outgoing_message.rs | 96 ++++++++++++++++--- codex-rs/app-server/src/thread_status.rs | 2 + 16 files changed, 309 insertions(+), 75 deletions(-) create mode 100644 codex-rs/app-server/src/analytics_events.rs diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 03592c9ff6..c4be7a8ab8 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -427,7 +427,7 @@ async fn ingest_rejected_turn_steer( .await; reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(4), request: Box::new(sample_turn_steer_request( @@ -487,7 +487,7 @@ async fn ingest_turn_prerequisites( ingest_initialize(reducer, out).await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_start_response( "thread-2", /*ephemeral*/ false, "gpt-5", @@ -501,7 +501,7 @@ async fn ingest_turn_prerequisites( reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(3), request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)), @@ -511,7 +511,7 @@ async fn ingest_turn_prerequisites( .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)), }, @@ -863,7 +863,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_start_response( "thread-no-client", @@ -907,7 +907,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_resume_response( "thread-1", /*ephemeral*/ true, "gpt-5", @@ -987,7 +987,7 @@ async fn compaction_event_ingests_custom_fact() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_resume_response_with_source( "thread-1", @@ -1098,7 +1098,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_start_response( "thread-guardian", @@ -1868,7 +1868,7 @@ async fn accepted_turn_steer_emits_expected_event() { .await; reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(4), request: Box::new(sample_turn_steer_request( @@ -1880,7 +1880,7 @@ async fn accepted_turn_steer_emits_expected_event() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)), }, @@ -2022,7 +2022,7 @@ async fn turn_start_error_response_discards_pending_start_request() { ingest_initialize(&mut reducer, &mut out).await; reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(3), request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)), @@ -2046,7 +2046,7 @@ async fn turn_start_error_response_discards_pending_start_request() { // failed turn/start request and attach request-scoped connection metadata. reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)), }, @@ -2163,7 +2163,7 @@ async fn accepted_steers_increment_turn_steer_count() { reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(4), request: Box::new(sample_turn_steer_request( @@ -2175,7 +2175,7 @@ async fn accepted_steers_increment_turn_steer_count() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)), }, @@ -2185,7 +2185,7 @@ async fn accepted_steers_increment_turn_steer_count() { reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(5), request: Box::new(sample_turn_steer_request( @@ -2209,7 +2209,7 @@ async fn accepted_steers_increment_turn_steer_count() { reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(6), request: Box::new(sample_turn_steer_request( @@ -2221,7 +2221,7 @@ async fn accepted_steers_increment_turn_steer_count() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 6)), }, diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index a3a20231f0..10a362e68f 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -27,6 +27,8 @@ use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ServerResponse; use codex_login::AuthManager; use codex_login::default_client::create_client; use codex_plugin::PluginTelemetryMetadata; @@ -49,8 +51,7 @@ pub(crate) struct AnalyticsEventsQueue { #[derive(Clone)] pub struct AnalyticsEventsClient { - queue: AnalyticsEventsQueue, - analytics_enabled: Option, + queue: Option, } impl AnalyticsEventsQueue { @@ -119,11 +120,15 @@ impl AnalyticsEventsClient { analytics_enabled: Option, ) -> Self { Self { - queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url), - analytics_enabled, + queue: (analytics_enabled != Some(false)) + .then(|| AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url)), } } + pub fn disabled() -> Self { + Self { queue: None } + } + pub fn track_skill_invocations( &self, tracking: TrackEventsContext, @@ -182,7 +187,7 @@ impl AnalyticsEventsClient { } pub fn track_request(&self, connection_id: u64, request_id: RequestId, request: ClientRequest) { - self.record_fact(AnalyticsFact::Request { + self.record_fact(AnalyticsFact::ClientRequest { connection_id, request_id, request: Box::new(request), @@ -190,7 +195,10 @@ impl AnalyticsEventsClient { } pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) { - if !self.queue.should_enqueue_app_used(&tracking, &app) { + let Some(queue) = self.queue.as_ref() else { + return; + }; + if !queue.should_enqueue_app_used(&tracking, &app) { return; } self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed( @@ -205,7 +213,10 @@ impl AnalyticsEventsClient { } pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) { - if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) { + let Some(queue) = self.queue.as_ref() else { + return; + }; + if !queue.should_enqueue_plugin_used(&tracking, &plugin) { return; } self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed( @@ -268,14 +279,13 @@ impl AnalyticsEventsClient { } pub(crate) fn record_fact(&self, input: AnalyticsFact) { - if self.analytics_enabled == Some(false) { - return; + if let Some(queue) = self.queue.as_ref() { + queue.try_send(input); } - self.queue.try_send(input); } pub fn track_response(&self, connection_id: u64, response: ClientResponse) { - self.record_fact(AnalyticsFact::Response { + self.record_fact(AnalyticsFact::ClientResponse { connection_id, response: Box::new(response), }); @@ -299,6 +309,20 @@ impl AnalyticsEventsClient { pub fn track_notification(&self, notification: ServerNotification) { self.record_fact(AnalyticsFact::Notification(Box::new(notification))); } + + pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) { + self.record_fact(AnalyticsFact::ServerRequest { + connection_id, + request: Box::new(request), + }); + } + + pub fn track_server_response(&self, connection_id: u64, response: ServerResponse) { + self.record_fact(AnalyticsFact::ServerResponse { + connection_id, + response: Box::new(response), + }); + } } async fn send_track_events( diff --git a/codex-rs/analytics/src/facts.rs b/codex-rs/analytics/src/facts.rs index 1d371acb1c..2ba6268da7 100644 --- a/codex-rs/analytics/src/facts.rs +++ b/codex-rs/analytics/src/facts.rs @@ -7,6 +7,8 @@ use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ServerResponse; use codex_plugin::PluginTelemetryMetadata; use codex_protocol::config_types::ApprovalsReviewer; use codex_protocol::config_types::ModeKind; @@ -271,12 +273,12 @@ pub(crate) enum AnalyticsFact { runtime: CodexRuntimeMetadata, rpc_transport: AppServerRpcTransport, }, - Request { + ClientRequest { connection_id: u64, request_id: RequestId, request: Box, }, - Response { + ClientResponse { connection_id: u64, response: Box, }, @@ -286,6 +288,14 @@ pub(crate) enum AnalyticsFact { error: JSONRPCErrorError, error_type: Option, }, + ServerRequest { + connection_id: u64, + request: Box, + }, + ServerResponse { + connection_id: u64, + response: Box, + }, Notification(Box), // Facts that do not naturally exist on the app-server protocol surface, or // would require non-trivial protocol reshaping on this branch. diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index a6ce3fc831..b40664e12a 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -171,14 +171,14 @@ impl AnalyticsReducer { rpc_transport, ); } - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id, request_id, request, } => { self.ingest_request(connection_id, request_id, *request); } - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id, response, } => { @@ -195,6 +195,14 @@ impl AnalyticsReducer { AnalyticsFact::Notification(notification) => { self.ingest_notification(*notification, out); } + AnalyticsFact::ServerRequest { + connection_id: _connection_id, + request: _request, + } => {} + AnalyticsFact::ServerResponse { + connection_id: _connection_id, + response: _response, + } => {} AnalyticsFact::Custom(input) => match input { CustomAnalyticsFact::SubAgentThreadStarted(input) => { self.ingest_subagent_thread_started(input, out); diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 5c858a026a..492164a60f 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -678,6 +678,23 @@ macro_rules! server_request_definitions { $(Self::$variant { request_id, .. } => request_id,)* } } + + pub fn response_from_result( + &self, + result: crate::Result, + ) -> serde_json::Result { + match self { + $( + Self::$variant { request_id, .. } => { + let response = serde_json::from_value::<$response>(result)?; + Ok(ServerResponse::$variant { + request_id: request_id.clone(), + response, + }) + } + )* + } + } } /// Typed response from the client to the server. diff --git a/codex-rs/app-server/src/analytics_events.rs b/codex-rs/app-server/src/analytics_events.rs new file mode 100644 index 0000000000..24ed12d2ad --- /dev/null +++ b/codex-rs/app-server/src/analytics_events.rs @@ -0,0 +1,16 @@ +use std::sync::Arc; + +use codex_analytics::AnalyticsEventsClient; +use codex_core::config::Config; +use codex_login::AuthManager; + +pub(crate) fn analytics_events_client_from_config( + auth_manager: Arc, + config: &Config, +) -> AnalyticsEventsClient { + AnalyticsEventsClient::new( + auth_manager, + config.chatgpt_base_url.trim_end_matches('/').to_string(), + config.analytics_enabled, + ) +} diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 12129f7ee5..ee48da82ce 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -3350,7 +3350,10 @@ mod tests { let conversation_id = ThreadId::new(); let thread_state = new_thread_state(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -3419,7 +3422,10 @@ mod tests { let conversation_id = ThreadId::new(); let thread_state = new_thread_state(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -3509,7 +3515,10 @@ mod tests { let thread_state = new_thread_state(); let thread_watch_manager = ThreadWatchManager::new(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4080,7 +4089,10 @@ mod tests { let conversation_id = ThreadId::new(); let event_turn_id = "complete1".to_string(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4144,7 +4156,10 @@ mod tests { ) .await; let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4192,7 +4207,10 @@ mod tests { ) .await; let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4234,7 +4252,10 @@ mod tests { #[tokio::test] async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4288,7 +4309,10 @@ mod tests { let conversation_id = ThreadId::new(); let turn_id = "turn-123".to_string(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4377,7 +4401,10 @@ mod tests { let conversation_id = ThreadId::new(); let turn_id = "turn-456".to_string(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4450,7 +4477,10 @@ mod tests { let thread_state = new_thread_state(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4712,7 +4742,10 @@ mod tests { #[tokio::test] async fn test_handle_turn_diff_emits_v2_notification() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4750,7 +4783,10 @@ mod tests { #[tokio::test] async fn test_handle_turn_diff_is_noop_for_v1() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4776,7 +4812,10 @@ mod tests { #[tokio::test] async fn test_hook_prompt_raw_response_emits_item_completed() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let conversation_id = ThreadId::new(); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index d6efa1050b..0d277530db 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -10686,7 +10686,10 @@ mod tests { let connection_id = ConnectionId(7); let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(8); - let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + outgoing_tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let thread_outgoing = ThreadScopedOutgoingMessageSender::new( outgoing.clone(), vec![connection_id], diff --git a/codex-rs/app-server/src/command_exec.rs b/codex-rs/app-server/src/command_exec.rs index b72c84e906..1414cefb61 100644 --- a/codex-rs/app-server/src/command_exec.rs +++ b/codex-rs/app-server/src/command_exec.rs @@ -757,7 +757,10 @@ mod tests { let manager = CommandExecManager::default(); let err = manager .start(StartCommandExecParams { - outgoing: Arc::new(OutgoingMessageSender::new(tx)), + outgoing: Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )), request_id: ConnectionRequestId { connection_id: ConnectionId(1), request_id: codex_app_server_protocol::RequestId::Integer(42), @@ -793,7 +796,10 @@ mod tests { manager .start(StartCommandExecParams { - outgoing: Arc::new(OutgoingMessageSender::new(tx)), + outgoing: Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )), request_id: request_id.clone(), process_id: Some("proc-99".to_string()), exec_request: windows_sandbox_exec_request(), @@ -843,7 +849,10 @@ mod tests { manager .start(StartCommandExecParams { - outgoing: Arc::new(OutgoingMessageSender::new(tx)), + outgoing: Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )), request_id: request_id.clone(), process_id: Some("proc-100".to_string()), exec_request: ExecRequest::new( diff --git a/codex-rs/app-server/src/fs_watch.rs b/codex-rs/app-server/src/fs_watch.rs index ff00051472..0d76096de0 100644 --- a/codex-rs/app-server/src/fs_watch.rs +++ b/codex-rs/app-server/src/fs_watch.rs @@ -234,7 +234,10 @@ mod tests { const OUTGOING_BUFFER: usize = 1; let (tx, _rx) = mpsc::channel(OUTGOING_BUFFER); FsWatchManager::new_with_file_watcher( - Arc::new(OutgoingMessageSender::new(tx)), + Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )), Arc::new(FileWatcher::noop()), ) } diff --git a/codex-rs/app-server/src/in_process.rs b/codex-rs/app-server/src/in_process.rs index 729f6d04af..12c5baf3e8 100644 --- a/codex-rs/app-server/src/in_process.rs +++ b/codex-rs/app-server/src/in_process.rs @@ -50,6 +50,7 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; +use crate::analytics_events::analytics_events_client_from_config; use crate::config_manager::ConfigManager; use crate::error_code::INTERNAL_ERROR_CODE; use crate::error_code::INVALID_REQUEST_ERROR_CODE; @@ -365,7 +366,14 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { let runtime_handle = tokio::spawn(async move { let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(channel_capacity); - let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx)); + let auth_manager = + AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env); + let analytics_events_client = + analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref()); + let outgoing_message_sender = Arc::new(OutgoingMessageSender::new( + outgoing_tx, + analytics_events_client.clone(), + )); let (writer_tx, mut writer_rx) = mpsc::channel::(channel_capacity); let outbound_initialized = Arc::new(AtomicBool::new(false)); @@ -390,8 +398,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { }); let processor_outgoing = Arc::clone(&outgoing_message_sender); - let auth_manager = - AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env); let config_manager = ConfigManager::new( args.config.codex_home.to_path_buf(), args.cli_overrides, @@ -404,6 +410,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { let mut processor_handle = tokio::spawn(async move { let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs { outgoing: Arc::clone(&processor_outgoing), + analytics_events_client, arg0_paths: args.arg0_paths, config: args.config, config_manager, @@ -561,7 +568,11 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { } Some(InProcessClientMessage::ServerRequestResponse { request_id, result }) => { outgoing_message_sender - .notify_client_response(request_id, result) + .notify_client_response( + IN_PROCESS_CONNECTION_ID, + request_id, + result, + ) .await; } Some(InProcessClientMessage::ServerRequestError { request_id, error }) => { diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index a2f35305ae..22bb5d409e 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::sync::RwLock; use std::sync::atomic::AtomicBool; +use crate::analytics_events::analytics_events_client_from_config; use crate::config_manager::ConfigManager; use crate::message_processor::MessageProcessor; use crate::message_processor::MessageProcessorArgs; @@ -65,6 +66,7 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::registry::Registry; use tracing_subscriber::util::SubscriberInitExt; +mod analytics_events; mod app_server_tracing; mod bespoke_event_handling; mod codex_message_processor; @@ -645,12 +647,18 @@ pub async fn run_main_with_transport( }); let processor_handle = tokio::spawn({ - let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx)); let outbound_control_tx = outbound_control_tx; let auth_manager = AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false); + let analytics_events_client = + analytics_events_client_from_config(Arc::clone(&auth_manager), &config); + let outgoing_message_sender = Arc::new(OutgoingMessageSender::new( + outgoing_tx, + analytics_events_client.clone(), + )); let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs { outgoing: outgoing_message_sender, + analytics_events_client, arg0_paths, config: Arc::new(config), config_manager, @@ -816,7 +824,7 @@ pub async fn run_main_with_transport( warn!("dropping response from unknown connection: {connection_id:?}"); continue; } - processor.process_response(response).await; + processor.process_response(connection_id, response).await; } JSONRPCMessage::Notification(notification) => { if !connections.contains_key(&connection_id) { diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 57fa6e21e0..a257a37ad0 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -249,6 +249,7 @@ impl ConnectionSessionState { pub(crate) struct MessageProcessorArgs { pub(crate) outgoing: Arc, + pub(crate) analytics_events_client: AnalyticsEventsClient, pub(crate) arg0_paths: Arg0DispatchPaths, pub(crate) config: Arc, pub(crate) config_manager: ConfigManager, @@ -268,6 +269,7 @@ impl MessageProcessor { pub(crate) fn new(args: MessageProcessorArgs) -> Self { let MessageProcessorArgs { outgoing, + analytics_events_client, arg0_paths, config, config_manager, @@ -283,11 +285,6 @@ impl MessageProcessor { auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge { outgoing: outgoing.clone(), })); - let analytics_events_client = AnalyticsEventsClient::new( - Arc::clone(&auth_manager), - config.chatgpt_base_url.trim_end_matches('/').to_string(), - config.analytics_enabled, - ); let thread_manager = Arc::new(ThreadManager::new( config.as_ref(), auth_manager.clone(), @@ -572,10 +569,16 @@ impl MessageProcessor { } /// Handle a standalone JSON-RPC response originating from the peer. - pub(crate) async fn process_response(&self, response: JSONRPCResponse) { + pub(crate) async fn process_response( + &self, + connection_id: ConnectionId, + response: JSONRPCResponse, + ) { tracing::info!("<- response: {:?}", response); let JSONRPCResponse { id, result, .. } = response; - self.outgoing.notify_client_response(id, result).await + self.outgoing + .notify_client_response(connection_id, id, result) + .await } /// Handle an error object received from the peer. diff --git a/codex-rs/app-server/src/message_processor/tracing_tests.rs b/codex-rs/app-server/src/message_processor/tracing_tests.rs index 04178c70bf..fe3b824a61 100644 --- a/codex-rs/app-server/src/message_processor/tracing_tests.rs +++ b/codex-rs/app-server/src/message_processor/tracing_tests.rs @@ -1,6 +1,7 @@ use super::ConnectionSessionState; use super::MessageProcessor; use super::MessageProcessorArgs; +use crate::analytics_events::analytics_events_client_from_config; use crate::config_manager::ConfigManager; use crate::outgoing_message::ConnectionId; use crate::outgoing_message::OutgoingMessageSender; @@ -263,9 +264,14 @@ fn build_test_processor( mpsc::Receiver, ) { let (outgoing_tx, outgoing_rx) = mpsc::channel(16); - let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx)); let auth_manager = AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false); + let analytics_events_client = + analytics_events_client_from_config(Arc::clone(&auth_manager), config.as_ref()); + let outgoing = Arc::new(OutgoingMessageSender::new( + outgoing_tx, + analytics_events_client.clone(), + )); let config_manager = ConfigManager::new( config.codex_home.to_path_buf(), Vec::new(), @@ -276,6 +282,7 @@ fn build_test_processor( ); let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs { outgoing, + analytics_events_client, arg0_paths: Arg0DispatchPaths::default(), config, config_manager, diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 3c5bef3ea4..11de41818e 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; +use codex_analytics::AnalyticsEventsClient; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::Result; @@ -117,6 +118,7 @@ pub(crate) struct OutgoingMessageSender { /// We keep them here because this is where responses, errors, and /// disconnect cleanup all get handled. request_contexts: Mutex>, + analytics_events_client: AnalyticsEventsClient, } #[derive(Clone)] @@ -203,12 +205,16 @@ impl ThreadScopedOutgoingMessageSender { } impl OutgoingMessageSender { - pub(crate) fn new(sender: mpsc::Sender) -> Self { + pub(crate) fn new( + sender: mpsc::Sender, + analytics_events_client: AnalyticsEventsClient, + ) -> Self { Self { next_server_request_id: AtomicI64::new(0), sender, request_id_to_callback: Mutex::new(HashMap::new()), request_contexts: Mutex::new(HashMap::new()), + analytics_events_client, } } @@ -298,7 +304,7 @@ impl OutgoingMessageSender { ); } - let outgoing_message = OutgoingMessage::Request(request); + let outgoing_message = OutgoingMessage::Request(request.clone()); let send_result = match connection_ids { None => { self.sender @@ -321,6 +327,9 @@ impl OutgoingMessageSender { { send_error = Some(err); break; + } else { + self.analytics_events_client + .track_server_request(connection_id.0, request.clone()); } } match send_error { @@ -359,11 +368,20 @@ impl OutgoingMessageSender { } } - pub(crate) async fn notify_client_response(&self, id: RequestId, result: Result) { + pub(crate) async fn notify_client_response( + &self, + connection_id: ConnectionId, + id: RequestId, + result: Result, + ) { let entry = self.take_request_callback(&id).await; match entry { Some((id, entry)) => { + if let Ok(response) = entry.request.response_from_result(result.clone()) { + self.analytics_events_client + .track_server_response(connection_id.0, response); + } if let Err(err) = entry.callback.send(Ok(result)) { warn!("could not notify callback for {id:?} due to: {err:?}"); } @@ -654,6 +672,8 @@ mod tests { use codex_app_server_protocol::AccountUpdatedNotification; use codex_app_server_protocol::ApplyPatchApprovalParams; use codex_app_server_protocol::AuthMode; + use codex_app_server_protocol::CommandExecutionApprovalDecision; + use codex_app_server_protocol::CommandExecutionRequestApprovalParams; use codex_app_server_protocol::ConfigWarningNotification; use codex_app_server_protocol::DynamicToolCallParams; use codex_app_server_protocol::FileChangeRequestApprovalParams; @@ -661,6 +681,7 @@ mod tests { use codex_app_server_protocol::ModelReroutedNotification; use codex_app_server_protocol::RateLimitSnapshot; use codex_app_server_protocol::RateLimitWindow; + use codex_app_server_protocol::ServerResponse; use codex_app_server_protocol::ToolRequestUserInputParams; use codex_protocol::ThreadId; use pretty_assertions::assert_eq; @@ -840,10 +861,52 @@ mod tests { ); } + #[test] + fn server_request_response_from_result_decodes_typed_response() { + let request = ServerRequest::CommandExecutionRequestApproval { + request_id: RequestId::Integer(7), + params: CommandExecutionRequestApprovalParams { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + item_id: "item-1".to_string(), + approval_id: None, + reason: None, + network_approval_context: None, + command: Some("echo hi".to_string()), + cwd: None, + command_actions: None, + additional_permissions: None, + proposed_execpolicy_amendment: None, + proposed_network_policy_amendments: None, + available_decisions: None, + }, + }; + + let response = request + .response_from_result(json!({ + "decision": "acceptForSession", + })) + .expect("decode typed server response"); + + let ServerResponse::CommandExecutionRequestApproval { + request_id, + response, + } = response + else { + panic!("expected command execution approval response"); + }; + assert_eq!(request_id, RequestId::Integer(7)); + assert_eq!( + response.decision, + CommandExecutionApprovalDecision::AcceptForSession + ); + } + #[tokio::test] async fn send_response_routes_to_target_connection() { let (tx, mut rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = + OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled()); let request_id = ConnectionRequestId { connection_id: ConnectionId(42), request_id: RequestId::Integer(7), @@ -878,7 +941,8 @@ mod tests { #[tokio::test] async fn send_response_clears_registered_request_context() { let (tx, _rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = + OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled()); let request_id = ConnectionRequestId { connection_id: ConnectionId(42), request_id: RequestId::Integer(7), @@ -903,7 +967,8 @@ mod tests { #[tokio::test] async fn send_error_routes_to_target_connection() { let (tx, mut rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = + OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled()); let request_id = ConnectionRequestId { connection_id: ConnectionId(9), request_id: RequestId::Integer(3), @@ -941,7 +1006,8 @@ mod tests { #[tokio::test] async fn send_server_notification_to_connection_and_wait_tracks_write_completion() { let (tx, mut rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = + OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled()); let send_task = tokio::spawn(async move { outgoing .send_server_notification_to_connection_and_wait( @@ -985,7 +1051,8 @@ mod tests { #[tokio::test] async fn connection_closed_clears_registered_request_contexts() { let (tx, _rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = + OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled()); let closed_connection_request = ConnectionRequestId { connection_id: ConnectionId(9), request_id: RequestId::Integer(3), @@ -1019,7 +1086,8 @@ mod tests { #[tokio::test] async fn notify_client_error_forwards_error_to_waiter() { let (tx, _rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = + OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled()); let (request_id, wait_for_result) = outgoing .send_request(ServerRequestPayload::ApplyPatchApproval( @@ -1053,7 +1121,10 @@ mod tests { #[tokio::test] async fn pending_requests_for_thread_returns_thread_requests_in_request_id_order() { let (tx, _rx) = mpsc::channel::(8); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let thread_id = ThreadId::new(); let thread_outgoing = ThreadScopedOutgoingMessageSender::new( outgoing.clone(), @@ -1111,7 +1182,10 @@ mod tests { #[tokio::test] async fn cancel_requests_for_thread_cancels_all_thread_requests() { let (tx, _rx) = mpsc::channel::(8); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let thread_id = ThreadId::new(); let thread_outgoing = ThreadScopedOutgoingMessageSender::new( outgoing.clone(), diff --git a/codex-rs/app-server/src/thread_status.rs b/codex-rs/app-server/src/thread_status.rs index f78b8753a9..b1373c293d 100644 --- a/codex-rs/app-server/src/thread_status.rs +++ b/codex-rs/app-server/src/thread_status.rs @@ -722,6 +722,7 @@ mod tests { let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8); let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new( outgoing_tx, + codex_analytics::AnalyticsEventsClient::disabled(), ))); manager @@ -764,6 +765,7 @@ mod tests { let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8); let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new( outgoing_tx, + codex_analytics::AnalyticsEventsClient::disabled(), ))); manager From a89b81baaf22e62730b4ded64717f1e461cebb9b Mon Sep 17 00:00:00 2001 From: Roy Han Date: Thu, 23 Apr 2026 16:47:36 -0400 Subject: [PATCH 2/9] track replayed server requests --- codex-rs/app-server/src/outgoing_message.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 9001663117..eecb5aa0dd 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -358,12 +358,15 @@ impl OutgoingMessageSender { .sender .send(OutgoingEnvelope::ToConnection { connection_id, - message: OutgoingMessage::Request(request), + message: OutgoingMessage::Request(request.clone()), write_complete_tx: None, }) .await { warn!("failed to resend request to client: {err:?}"); + } else { + self.analytics_events_client + .track_server_request(connection_id.0, request); } } } From 5c30a31222fe24e4246962afc65411ba5dea6f9c Mon Sep 17 00:00:00 2001 From: Roy Han Date: Thu, 23 Apr 2026 16:55:36 -0400 Subject: [PATCH 3/9] track broadcast server request responses --- codex-rs/app-server/src/outgoing_message.rs | 36 ++++++++++++++++++--- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index eecb5aa0dd..3f1d4f78e4 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::collections::HashSet; use std::fmt; use std::sync::Arc; use std::sync::atomic::AtomicI64; @@ -132,6 +133,7 @@ struct PendingCallbackEntry { callback: oneshot::Sender, thread_id: Option, request: ServerRequest, + tracked_request_connection_ids: HashSet, } impl ThreadScopedOutgoingMessageSender { @@ -300,6 +302,7 @@ impl OutgoingMessageSender { callback: tx_approve, thread_id, request: request.clone(), + tracked_request_connection_ids: HashSet::new(), }, ); } @@ -328,8 +331,8 @@ impl OutgoingMessageSender { send_error = Some(err); break; } else { - self.analytics_events_client - .track_server_request(connection_id.0, request.clone()); + self.track_pending_server_request(*connection_id, &request) + .await; } } match send_error { @@ -365,12 +368,30 @@ impl OutgoingMessageSender { { warn!("failed to resend request to client: {err:?}"); } else { - self.analytics_events_client - .track_server_request(connection_id.0, request); + self.track_pending_server_request(connection_id, &request) + .await; } } } + async fn track_pending_server_request( + &self, + connection_id: ConnectionId, + request: &ServerRequest, + ) { + let request_id = request.id().clone(); + let should_track = { + let mut request_id_to_callback = self.request_id_to_callback.lock().await; + request_id_to_callback + .get_mut(&request_id) + .is_some_and(|entry| entry.tracked_request_connection_ids.insert(connection_id)) + }; + if should_track { + self.analytics_events_client + .track_server_request(connection_id.0, request.clone()); + } + } + pub(crate) async fn notify_client_response( &self, connection_id: ConnectionId, @@ -381,6 +402,13 @@ impl OutgoingMessageSender { match entry { Some((id, entry)) => { + if !entry + .tracked_request_connection_ids + .contains(&connection_id) + { + self.analytics_events_client + .track_server_request(connection_id.0, entry.request.clone()); + } if let Ok(response) = entry.request.response_from_result(result.clone()) { self.analytics_events_client .track_server_response(connection_id.0, response); From 34733b87234f6248f27048eb5b96927ee96dc82c Mon Sep 17 00:00:00 2001 From: Roy Han Date: Thu, 23 Apr 2026 17:03:26 -0400 Subject: [PATCH 4/9] avoid cloning server response payloads --- codex-rs/app-server-protocol/src/protocol/common.rs | 5 +++-- codex-rs/app-server/src/outgoing_message.rs | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 1fc8646fb4..d55aa79ed1 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -689,12 +689,13 @@ macro_rules! server_request_definitions { pub fn response_from_result( &self, - result: crate::Result, + result: &crate::Result, ) -> serde_json::Result { match self { $( Self::$variant { request_id, .. } => { - let response = serde_json::from_value::<$response>(result)?; + let response = + <$response as serde::Deserialize>::deserialize(result)?; Ok(ServerResponse::$variant { request_id: request_id.clone(), response, diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 3f1d4f78e4..4b14a2a6c2 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -409,7 +409,7 @@ impl OutgoingMessageSender { self.analytics_events_client .track_server_request(connection_id.0, entry.request.clone()); } - if let Ok(response) = entry.request.response_from_result(result.clone()) { + if let Ok(response) = entry.request.response_from_result(&result) { self.analytics_events_client .track_server_response(connection_id.0, response); } @@ -939,7 +939,7 @@ mod tests { }; let response = request - .response_from_result(json!({ + .response_from_result(&json!({ "decision": "acceptForSession", })) .expect("decode typed server response"); From c092aa338c1c2889f1dfb8c73e73d64e99166786 Mon Sep 17 00:00:00 2001 From: Roy Han Date: Thu, 23 Apr 2026 17:47:25 -0400 Subject: [PATCH 5/9] ignore untracked broadcast server responses --- codex-rs/app-server/src/outgoing_message.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 4b14a2a6c2..de7d810f2f 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -402,14 +402,11 @@ impl OutgoingMessageSender { match entry { Some((id, entry)) => { - if !entry + if entry .tracked_request_connection_ids .contains(&connection_id) + && let Ok(response) = entry.request.response_from_result(&result) { - self.analytics_events_client - .track_server_request(connection_id.0, entry.request.clone()); - } - if let Ok(response) = entry.request.response_from_result(&result) { self.analytics_events_client .track_server_response(connection_id.0, response); } From 82c12ae6bfd7e0e36ff39452753dcd2f5e4f4b04 Mon Sep 17 00:00:00 2001 From: Roy Han Date: Thu, 23 Apr 2026 17:51:48 -0400 Subject: [PATCH 6/9] simplify server request tracking state --- codex-rs/analytics/src/client.rs | 4 +++ codex-rs/app-server/src/outgoing_message.rs | 35 +++++---------------- 2 files changed, 11 insertions(+), 28 deletions(-) diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index 10a362e68f..d018d9912e 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -57,11 +57,15 @@ pub struct AnalyticsEventsClient { impl AnalyticsEventsQueue { pub(crate) fn new(auth_manager: Arc, base_url: String) -> Self { let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE); + let auth_manager = Arc::downgrade(&auth_manager); tokio::spawn(async move { let mut reducer = AnalyticsReducer::default(); while let Some(input) = receiver.recv().await { let mut events = Vec::new(); reducer.ingest(input, &mut events).await; + let Some(auth_manager) = auth_manager.upgrade() else { + break; + }; send_track_events(&auth_manager, &base_url, events).await; } }); diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index de7d810f2f..2a19564720 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::collections::HashSet; use std::fmt; use std::sync::Arc; use std::sync::atomic::AtomicI64; @@ -133,7 +132,7 @@ struct PendingCallbackEntry { callback: oneshot::Sender, thread_id: Option, request: ServerRequest, - tracked_request_connection_ids: HashSet, + track_server_response: bool, } impl ThreadScopedOutgoingMessageSender { @@ -302,7 +301,7 @@ impl OutgoingMessageSender { callback: tx_approve, thread_id, request: request.clone(), - tracked_request_connection_ids: HashSet::new(), + track_server_response: connection_ids.is_some(), }, ); } @@ -331,8 +330,8 @@ impl OutgoingMessageSender { send_error = Some(err); break; } else { - self.track_pending_server_request(*connection_id, &request) - .await; + self.analytics_events_client + .track_server_request(connection_id.0, request.clone()); } } match send_error { @@ -368,30 +367,12 @@ impl OutgoingMessageSender { { warn!("failed to resend request to client: {err:?}"); } else { - self.track_pending_server_request(connection_id, &request) - .await; + self.analytics_events_client + .track_server_request(connection_id.0, request); } } } - async fn track_pending_server_request( - &self, - connection_id: ConnectionId, - request: &ServerRequest, - ) { - let request_id = request.id().clone(); - let should_track = { - let mut request_id_to_callback = self.request_id_to_callback.lock().await; - request_id_to_callback - .get_mut(&request_id) - .is_some_and(|entry| entry.tracked_request_connection_ids.insert(connection_id)) - }; - if should_track { - self.analytics_events_client - .track_server_request(connection_id.0, request.clone()); - } - } - pub(crate) async fn notify_client_response( &self, connection_id: ConnectionId, @@ -402,9 +383,7 @@ impl OutgoingMessageSender { match entry { Some((id, entry)) => { - if entry - .tracked_request_connection_ids - .contains(&connection_id) + if entry.track_server_response && let Ok(response) = entry.request.response_from_result(&result) { self.analytics_events_client From fe7e8e7a0cb20ab31084f78cb06cfad5bf297a7b Mon Sep 17 00:00:00 2001 From: Roy Han Date: Mon, 27 Apr 2026 16:39:15 -0700 Subject: [PATCH 7/9] [codex-analytics] make auth retention explicit --- .../analytics/src/analytics_client_tests.rs | 35 +++++++++++++++ codex-rs/analytics/src/client.rs | 43 ++++++++++++++++--- codex-rs/analytics/src/lib.rs | 1 + codex-rs/app-server/src/analytics_events.rs | 2 + .../app-server/src/bespoke_event_handling.rs | 2 + codex-rs/app-server/src/config_api.rs | 2 + codex-rs/core/src/session/mod.rs | 1 + codex-rs/core/src/session/session.rs | 1 + codex-rs/core/src/session/tests.rs | 2 + 9 files changed, 84 insertions(+), 5 deletions(-) diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 9f88ceeb94..e86373ce44 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -1,4 +1,6 @@ +use crate::client::AnalyticsEventsClient; use crate::client::AnalyticsEventsQueue; +use crate::client::AuthManagerRetention; use crate::events::AppServerRpcTransport; use crate::events::CodexAppMentionedEventRequest; use crate::events::CodexAppServerClientMetadata; @@ -83,6 +85,8 @@ use codex_app_server_protocol::TurnStatus as AppServerTurnStatus; use codex_app_server_protocol::TurnSteerParams; use codex_app_server_protocol::TurnSteerResponse; use codex_app_server_protocol::UserInput; +use codex_login::AuthManager; +use codex_login::CodexAuth; use codex_login::default_client::DEFAULT_ORIGINATOR; use codex_login::default_client::originator; use codex_plugin::AppConnectorId; @@ -793,6 +797,37 @@ fn app_used_dedupe_is_keyed_by_turn_and_connector() { assert_eq!(queue.should_enqueue_app_used(&turn_2, &app), true); } +#[tokio::test] +async fn default_client_retains_auth_manager() { + let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test")); + let weak_auth_manager = Arc::downgrade(&auth_manager); + + let _client = AnalyticsEventsClient::new( + auth_manager, + "http://localhost".to_string(), + None, + AuthManagerRetention::Strong, + ); + + assert!(weak_auth_manager.upgrade().is_some()); +} + +#[tokio::test] +async fn non_owning_client_does_not_retain_auth_manager() { + let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test")); + let weak_auth_manager = Arc::downgrade(&auth_manager); + + let _client = AnalyticsEventsClient::new( + Arc::clone(&auth_manager), + "http://localhost".to_string(), + None, + AuthManagerRetention::Weak, + ); + drop(auth_manager); + + assert!(weak_auth_manager.upgrade().is_none()); +} + #[test] fn thread_initialized_event_serializes_expected_shape() { let event = TrackEventRequest::ThreadInitialized(ThreadInitializedEvent { diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index dbc15c1d6e..ea68918364 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -54,16 +54,29 @@ pub struct AnalyticsEventsClient { queue: Option, } +#[derive(Debug, Clone, Copy)] +pub enum AuthManagerRetention { + Strong, + Weak, +} + impl AnalyticsEventsQueue { - pub(crate) fn new(auth_manager: Arc, base_url: String) -> Self { + pub(crate) fn new( + auth_manager: Arc, + base_url: String, + retention: AuthManagerRetention, + ) -> Self { let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE); - let auth_manager = Arc::downgrade(&auth_manager); + let auth_manager = match retention { + AuthManagerRetention::Strong => AuthManagerHandle::Strong(auth_manager), + AuthManagerRetention::Weak => AuthManagerHandle::Weak(Arc::downgrade(&auth_manager)), + }; tokio::spawn(async move { let mut reducer = AnalyticsReducer::default(); while let Some(input) = receiver.recv().await { let mut events = Vec::new(); reducer.ingest(input, &mut events).await; - let Some(auth_manager) = auth_manager.upgrade() else { + let Some(auth_manager) = auth_manager.get() else { break; }; send_track_events(&auth_manager, &base_url, events).await; @@ -117,15 +130,35 @@ impl AnalyticsEventsQueue { } } +enum AuthManagerHandle { + Strong(Arc), + Weak(std::sync::Weak), +} + +impl AuthManagerHandle { + fn get(&self) -> Option> { + match self { + Self::Strong(auth_manager) => Some(Arc::clone(auth_manager)), + Self::Weak(auth_manager) => auth_manager.upgrade(), + } + } +} + impl AnalyticsEventsClient { pub fn new( auth_manager: Arc, base_url: String, analytics_enabled: Option, + auth_manager_retention: AuthManagerRetention, ) -> Self { Self { - queue: (analytics_enabled != Some(false)) - .then(|| AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url)), + queue: (analytics_enabled != Some(false)).then(|| { + AnalyticsEventsQueue::new( + Arc::clone(&auth_manager), + base_url, + auth_manager_retention, + ) + }), } } diff --git a/codex-rs/analytics/src/lib.rs b/codex-rs/analytics/src/lib.rs index ed0f1036ca..02bf88b2b2 100644 --- a/codex-rs/analytics/src/lib.rs +++ b/codex-rs/analytics/src/lib.rs @@ -7,6 +7,7 @@ use std::time::SystemTime; use std::time::UNIX_EPOCH; pub use client::AnalyticsEventsClient; +pub use client::AuthManagerRetention; pub use events::AppServerRpcTransport; pub use events::GuardianApprovalRequestSource; pub use events::GuardianReviewAnalyticsResult; diff --git a/codex-rs/app-server/src/analytics_events.rs b/codex-rs/app-server/src/analytics_events.rs index 24ed12d2ad..c41bfc8b31 100644 --- a/codex-rs/app-server/src/analytics_events.rs +++ b/codex-rs/app-server/src/analytics_events.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use codex_analytics::AnalyticsEventsClient; +use codex_analytics::AuthManagerRetention; use codex_core::config::Config; use codex_login::AuthManager; @@ -12,5 +13,6 @@ pub(crate) fn analytics_events_client_from_config( auth_manager, config.chatgpt_base_url.trim_end_matches('/').to_string(), config.analytics_enabled, + AuthManagerRetention::Weak, ) } diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index f0300e8ed1..89ff3d1493 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -3118,6 +3118,7 @@ mod tests { use anyhow::Result; use anyhow::anyhow; use anyhow::bail; + use codex_analytics::AuthManagerRetention; use codex_app_server_protocol::AutoReviewDecisionSource; use codex_app_server_protocol::GuardianApprovalReviewStatus; use codex_app_server_protocol::JSONRPCErrorError; @@ -3615,6 +3616,7 @@ mod tests { ), "http://localhost".to_string(), Some(false), + AuthManagerRetention::Strong, ), codex_home: codex_home.path().to_path_buf(), }; diff --git a/codex-rs/app-server/src/config_api.rs b/codex-rs/app-server/src/config_api.rs index e8bb82777c..5503f837fc 100644 --- a/codex-rs/app-server/src/config_api.rs +++ b/codex-rs/app-server/src/config_api.rs @@ -467,6 +467,7 @@ mod tests { use super::*; use crate::config_manager::apply_runtime_feature_enablement; use codex_analytics::AnalyticsEventsClient; + use codex_analytics::AuthManagerRetention; use codex_arg0::Arg0DispatchPaths; use codex_config::CloudRequirementsLoader; use codex_config::LoaderOverrides; @@ -832,6 +833,7 @@ mod tests { .trim_end_matches('/') .to_string(), analytics_config.analytics_enabled, + AuthManagerRetention::Strong, ), ); diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 1ebe2b4fc7..6a46b072b2 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -46,6 +46,7 @@ use async_channel::Sender; use chrono::Local; use chrono::Utc; use codex_analytics::AnalyticsEventsClient; +use codex_analytics::AuthManagerRetention; use codex_analytics::SubAgentThreadStartedInput; use codex_app_server_protocol::McpServerElicitationRequest; use codex_app_server_protocol::McpServerElicitationRequestParams; diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index dcadac70a8..c3fd6e9cff 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -776,6 +776,7 @@ impl Session { Arc::clone(&auth_manager), config.chatgpt_base_url.trim_end_matches('/').to_string(), config.analytics_enabled, + AuthManagerRetention::Strong, ) }); let services = SessionServices { diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 9515755323..4876b9da3c 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -3423,6 +3423,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { Arc::clone(&auth_manager), config.chatgpt_base_url.trim_end_matches('/').to_string(), config.analytics_enabled, + AuthManagerRetention::Strong, ), hooks: Hooks::new(HooksConfig { legacy_notify_argv: config.notify.clone(), @@ -4782,6 +4783,7 @@ where Arc::clone(&auth_manager), config.chatgpt_base_url.trim_end_matches('/').to_string(), config.analytics_enabled, + AuthManagerRetention::Strong, ), hooks: Hooks::new(HooksConfig { legacy_notify_argv: config.notify.clone(), From 3be8cc539fddc85123758ac8385d469fdf9e7a15 Mon Sep 17 00:00:00 2001 From: Roy Han Date: Mon, 27 Apr 2026 16:54:31 -0700 Subject: [PATCH 8/9] revert tracing test stack wrapper --- .../src/message_processor/tracing_tests.rs | 136 +++++++++--------- 1 file changed, 67 insertions(+), 69 deletions(-) diff --git a/codex-rs/app-server/src/message_processor/tracing_tests.rs b/codex-rs/app-server/src/message_processor/tracing_tests.rs index ed8762370a..5b1eba55a9 100644 --- a/codex-rs/app-server/src/message_processor/tracing_tests.rs +++ b/codex-rs/app-server/src/message_processor/tracing_tests.rs @@ -732,79 +732,77 @@ async fn remote_control_origin_rejects_device_key_requests() -> Result<()> { Ok(()) } -#[test] +#[tokio::test(flavor = "current_thread")] #[serial(app_server_tracing)] -fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> { - run_current_thread_test_with_stack("turn_start_jsonrpc_span_parents_core_turn_spans", async { - let mut harness = TracingHarness::new().await?; - let thread_start_response = harness.start_thread(/*request_id*/ 2, /*trace*/ None).await; - let thread_id = thread_start_response.thread.id.clone(); +async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> { + let mut harness = TracingHarness::new().await?; + let thread_start_response = harness.start_thread(/*request_id*/ 2, /*trace*/ None).await; + let thread_id = thread_start_response.thread.id.clone(); - harness.reset_tracing(); + harness.reset_tracing(); - let RemoteTrace { - trace_id: remote_trace_id, - parent_span_id: remote_parent_span_id, - context: remote_trace, - } = RemoteTrace::new("00000000000000000000000000000077", "0000000000000088"); - let turn_start_response: TurnStartResponse = harness - .request( - ClientRequest::TurnStart { - request_id: RequestId::Integer(3), - params: TurnStartParams { - environments: None, - thread_id, - input: vec![UserInput::Text { - text: "hello".to_string(), - text_elements: Vec::new(), - }], - responsesapi_client_metadata: None, - cwd: None, - approval_policy: None, - sandbox_policy: None, - permission_profile: None, - approvals_reviewer: None, - model: None, - service_tier: None, - effort: None, - summary: None, - personality: None, - output_schema: None, - collaboration_mode: None, - }, + let RemoteTrace { + trace_id: remote_trace_id, + parent_span_id: remote_parent_span_id, + context: remote_trace, + } = RemoteTrace::new("00000000000000000000000000000077", "0000000000000088"); + let turn_start_response: TurnStartResponse = harness + .request( + ClientRequest::TurnStart { + request_id: RequestId::Integer(3), + params: TurnStartParams { + environments: None, + thread_id, + input: vec![UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }], + responsesapi_client_metadata: None, + cwd: None, + approval_policy: None, + sandbox_policy: None, + permission_profile: None, + approvals_reviewer: None, + model: None, + service_tier: None, + effort: None, + summary: None, + personality: None, + output_schema: None, + collaboration_mode: None, }, - Some(remote_trace), - ) - .await; - let spans = wait_for_exported_spans(harness.tracing, |spans| { - spans.iter().any(|span| { - span.span_kind == SpanKind::Server - && span_attr(span, "rpc.method") == Some("turn/start") - && span.span_context.trace_id() == remote_trace_id - }) && spans.iter().any(|span| { - span_attr(span, "codex.op") == Some("user_input") - && span.span_context.trace_id() == remote_trace_id - }) - }) + }, + Some(remote_trace), + ) .await; - - let server_request_span = - find_rpc_span_with_trace(&spans, SpanKind::Server, "turn/start", remote_trace_id); - let core_turn_span = - find_span_with_trace(&spans, remote_trace_id, "codex.op=user_input", |span| { - span_attr(span, "codex.op") == Some("user_input") - }); - - assert_eq!(server_request_span.parent_span_id, remote_parent_span_id); - assert!(server_request_span.parent_span_is_remote); - assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id); - assert_eq!( - span_attr(server_request_span, "turn.id"), - Some(turn_start_response.turn.id.as_str()) - ); - assert_span_descends_from(&spans, core_turn_span, server_request_span); - harness.shutdown().await; - - Ok(()) + let spans = wait_for_exported_spans(harness.tracing, |spans| { + spans.iter().any(|span| { + span.span_kind == SpanKind::Server + && span_attr(span, "rpc.method") == Some("turn/start") + && span.span_context.trace_id() == remote_trace_id + }) && spans.iter().any(|span| { + span_attr(span, "codex.op") == Some("user_input") + && span.span_context.trace_id() == remote_trace_id + }) }) + .await; + + let server_request_span = + find_rpc_span_with_trace(&spans, SpanKind::Server, "turn/start", remote_trace_id); + let core_turn_span = + find_span_with_trace(&spans, remote_trace_id, "codex.op=user_input", |span| { + span_attr(span, "codex.op") == Some("user_input") + }); + + assert_eq!(server_request_span.parent_span_id, remote_parent_span_id); + assert!(server_request_span.parent_span_is_remote); + assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id); + assert_eq!( + span_attr(server_request_span, "turn.id"), + Some(turn_start_response.turn.id.as_str()) + ); + assert_span_descends_from(&spans, core_turn_span, server_request_span); + harness.shutdown().await; + + Ok(()) } From 54591dac4e2a19f6020e9550ac2e656e86b634d7 Mon Sep 17 00:00:00 2001 From: Roy Han Date: Mon, 27 Apr 2026 17:02:45 -0700 Subject: [PATCH 9/9] [codex-analytics] annotate analytics-enabled test args --- codex-rs/analytics/src/analytics_client_tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index e86373ce44..9179ea90cf 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -805,7 +805,7 @@ async fn default_client_retains_auth_manager() { let _client = AnalyticsEventsClient::new( auth_manager, "http://localhost".to_string(), - None, + /*analytics_enabled*/ None, AuthManagerRetention::Strong, ); @@ -820,7 +820,7 @@ async fn non_owning_client_does_not_retain_auth_manager() { let _client = AnalyticsEventsClient::new( Arc::clone(&auth_manager), "http://localhost".to_string(), - None, + /*analytics_enabled*/ None, AuthManagerRetention::Weak, ); drop(auth_manager);