[codex-analytics] feature plumbing and emittance (#16640)

---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/16640).
* #16870
* #16706
* #16641
* __->__ #16640
This commit is contained in:
rhan-oai
2026-04-13 23:11:49 -07:00
committed by GitHub
parent 05c5829923
commit b704df85b8
28 changed files with 2511 additions and 118 deletions

View File

@@ -22,6 +22,9 @@ use chrono::DateTime;
use chrono::SecondsFormat;
use chrono::Utc;
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::AnalyticsJsonRpcError;
use codex_analytics::InputError;
use codex_analytics::TurnSteerRequestError;
use codex_app_server_protocol::Account;
use codex_app_server_protocol::AccountLoginCompletedNotification;
use codex_app_server_protocol::AccountUpdatedNotification;
@@ -36,7 +39,7 @@ use codex_app_server_protocol::CancelLoginAccountResponse;
use codex_app_server_protocol::CancelLoginAccountStatus;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::CodexErrorInfo as AppServerCodexErrorInfo;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::CollaborationModeListParams;
use codex_app_server_protocol::CollaborationModeListResponse;
use codex_app_server_protocol::CommandExecParams;
@@ -642,6 +645,22 @@ impl CodexMessageProcessor {
}
}
fn track_error_response(
&self,
request_id: &ConnectionRequestId,
error: &JSONRPCErrorError,
error_type: Option<AnalyticsJsonRpcError>,
) {
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_error_response(
request_id.connection_id.0,
request_id.request_id.clone(),
error.clone(),
error_type,
);
}
}
async fn load_thread(
&self,
thread_id: &str,
@@ -6920,12 +6939,18 @@ impl CodexMessageProcessor {
app_server_client_version: Option<String>,
) {
if let Err(error) = Self::validate_v2_input_limit(&params.input) {
self.track_error_response(
&request_id,
&error,
Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)),
);
self.outgoing.send_error(request_id, error).await;
return;
}
let (_, thread) = match self.load_thread(&params.thread_id).await {
Ok(v) => v,
Err(error) => {
self.track_error_response(&request_id, &error, /*error_type*/ None);
self.outgoing.send_error(request_id, error).await;
return;
}
@@ -6937,6 +6962,7 @@ impl CodexMessageProcessor {
)
.await
{
self.track_error_response(&request_id, &error, /*error_type*/ None);
self.outgoing.send_error(request_id, error).await;
return;
}
@@ -7020,6 +7046,15 @@ impl CodexMessageProcessor {
};
let response = TurnStartResponse { turn };
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::TurnStart {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
}
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
@@ -7028,6 +7063,7 @@ impl CodexMessageProcessor {
message: format!("failed to start turn: {err}"),
data: None,
};
self.track_error_response(&request_id, &error, /*error_type*/ None);
self.outgoing.send_error(request_id, error).await;
}
}
@@ -7101,6 +7137,7 @@ impl CodexMessageProcessor {
let (_, thread) = match self.load_thread(&params.thread_id).await {
Ok(v) => v,
Err(error) => {
self.track_error_response(&request_id, &error, /*error_type*/ None);
self.outgoing.send_error(request_id, error).await;
return;
}
@@ -7118,6 +7155,11 @@ impl CodexMessageProcessor {
.record_request_turn_id(&request_id, &params.expected_turn_id)
.await;
if let Err(error) = Self::validate_v2_input_limit(&params.input) {
self.track_error_response(
&request_id,
&error,
Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)),
);
self.outgoing.send_error(request_id, error).await;
return;
}
@@ -7138,36 +7180,51 @@ impl CodexMessageProcessor {
{
Ok(turn_id) => {
let response = TurnSteerResponse { turn_id };
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::TurnSteer {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
}
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let (code, message, data) = match err {
let (code, message, data, error_type) = match err {
SteerInputError::NoActiveTurn(_) => (
INVALID_REQUEST_ERROR_CODE,
"no active turn to steer".to_string(),
None,
Some(AnalyticsJsonRpcError::TurnSteer(
TurnSteerRequestError::NoActiveTurn,
)),
),
SteerInputError::ExpectedTurnMismatch { expected, actual } => (
INVALID_REQUEST_ERROR_CODE,
format!("expected active turn id `{expected}` but found `{actual}`"),
None,
Some(AnalyticsJsonRpcError::TurnSteer(
TurnSteerRequestError::ExpectedTurnMismatch,
)),
),
SteerInputError::ActiveTurnNotSteerable { turn_kind } => {
let message = match turn_kind {
codex_protocol::protocol::NonSteerableTurnKind::Review => {
"cannot steer a review turn".to_string()
}
codex_protocol::protocol::NonSteerableTurnKind::Compact => {
"cannot steer a compact turn".to_string()
}
let (message, turn_steer_error) = match turn_kind {
codex_protocol::protocol::NonSteerableTurnKind::Review => (
"cannot steer a review turn".to_string(),
TurnSteerRequestError::NonSteerableReview,
),
codex_protocol::protocol::NonSteerableTurnKind::Compact => (
"cannot steer a compact turn".to_string(),
TurnSteerRequestError::NonSteerableCompact,
),
};
let error = TurnError {
message: message.clone(),
codex_error_info: Some(
AppServerCodexErrorInfo::ActiveTurnNotSteerable {
turn_kind: turn_kind.into(),
},
),
codex_error_info: Some(CodexErrorInfo::ActiveTurnNotSteerable {
turn_kind: turn_kind.into(),
}),
additional_details: None,
};
let data = match serde_json::to_value(error) {
@@ -7180,12 +7237,18 @@ impl CodexMessageProcessor {
None
}
};
(INVALID_REQUEST_ERROR_CODE, message, data)
(
INVALID_REQUEST_ERROR_CODE,
message,
data,
Some(AnalyticsJsonRpcError::TurnSteer(turn_steer_error)),
)
}
SteerInputError::EmptyInput => (
INVALID_REQUEST_ERROR_CODE,
"input must not be empty".to_string(),
None,
Some(AnalyticsJsonRpcError::Input(InputError::Empty)),
),
};
let error = JSONRPCErrorError {
@@ -7193,6 +7256,7 @@ impl CodexMessageProcessor {
message,
data,
};
self.track_error_response(&request_id, &error, error_type);
self.outgoing.send_error(request_id, error).await;
}
}
@@ -7940,6 +8004,9 @@ impl CodexMessageProcessor {
conversation_id,
conversation.clone(),
thread_manager.clone(),
listener_task_context
.general_analytics_enabled
.then(|| listener_task_context.analytics_events_client.clone()),
thread_outgoing,
thread_state.clone(),
thread_watch_manager.clone(),