[codex-analytics] feature plumbing and emittance

This commit is contained in:
rhan-oai
2026-04-06 21:40:43 -07:00
parent 24c598e8a9
commit a860b59e5d
21 changed files with 1269 additions and 43 deletions

View File

@@ -5,6 +5,8 @@ use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexRuntimeMetadata;
use crate::events::CodexTurnEventParams;
use crate::events::CodexTurnEventRequest;
use crate::events::SkillInvocationEventParams;
use crate::events::SkillInvocationEventRequest;
use crate::events::ThreadInitializationMode;
@@ -26,11 +28,22 @@ use crate::facts::PluginStateChangedInput;
use crate::facts::PluginUsedInput;
use crate::facts::SkillInvokedInput;
use crate::facts::SubAgentThreadStartedInput;
use crate::facts::TurnResolvedConfigFact;
use crate::facts::TurnStatus;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::UserInput;
use codex_git_utils::collect_git_info;
use codex_git_utils::get_git_repo_root;
use codex_login::default_client::originator;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SkillScope;
use sha1::Digest;
@@ -39,6 +52,8 @@ use std::path::Path;
#[derive(Default)]
pub(crate) struct AnalyticsReducer {
requests: HashMap<(u64, RequestId), RequestState>,
turns: HashMap<String, TurnState>,
connections: HashMap<u64, ConnectionState>,
}
@@ -47,6 +62,32 @@ struct ConnectionState {
runtime: CodexRuntimeMetadata,
}
enum RequestState {
TurnStart(PendingTurnStartState),
}
struct PendingTurnStartState {
thread_id: String,
num_input_images: usize,
}
#[derive(Clone)]
struct CompletedTurnState {
status: Option<TurnStatus>,
turn_error: Option<CodexErrorInfo>,
completed_at: u64,
duration_ms: Option<u64>,
}
struct TurnState {
connection_id: Option<u64>,
thread_id: Option<String>,
num_input_images: Option<usize>,
resolved_config: Option<TurnResolvedConfigFact>,
started_at: Option<u64>,
completed: Option<CompletedTurnState>,
}
impl AnalyticsReducer {
pub(crate) async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec<TrackEventRequest>) {
match input {
@@ -66,21 +107,28 @@ impl AnalyticsReducer {
);
}
AnalyticsFact::Request {
connection_id: _connection_id,
request_id: _request_id,
request: _request,
} => {}
connection_id,
request_id,
request,
} => {
self.ingest_request(connection_id, request_id, *request);
}
AnalyticsFact::Response {
connection_id,
response,
} => {
self.ingest_response(connection_id, *response, out);
}
AnalyticsFact::Notification(_notification) => {}
AnalyticsFact::Notification(notification) => {
self.ingest_notification(*notification, out);
}
AnalyticsFact::Custom(input) => match input {
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
self.ingest_subagent_thread_started(input, out);
}
CustomAnalyticsFact::TurnResolvedConfig(input) => {
self.ingest_turn_resolved_config(*input, out);
}
CustomAnalyticsFact::SkillInvoked(input) => {
self.ingest_skill_invoked(input, out).await;
}
@@ -135,6 +183,52 @@ impl AnalyticsReducer {
));
}
fn ingest_request(
&mut self,
connection_id: u64,
request_id: RequestId,
request: ClientRequest,
) {
let ClientRequest::TurnStart { params, .. } = request else {
return;
};
self.requests.insert(
(connection_id, request_id),
RequestState::TurnStart(PendingTurnStartState {
thread_id: params.thread_id,
num_input_images: params
.input
.iter()
.filter(|item| {
matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. })
})
.count(),
}),
);
}
fn ingest_turn_resolved_config(
&mut self,
input: TurnResolvedConfigFact,
out: &mut Vec<TrackEventRequest>,
) {
let turn_id = input.turn_id.clone();
let thread_id = input.thread_id.clone();
let num_input_images = input.num_input_images;
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
completed: None,
});
turn_state.thread_id = Some(thread_id);
turn_state.num_input_images = Some(num_input_images);
turn_state.resolved_config = Some(input);
self.maybe_emit_turn_event(&turn_id, out);
}
async fn ingest_skill_invoked(
&mut self,
input: SkillInvokedInput,
@@ -235,24 +329,124 @@ impl AnalyticsReducer {
response: ClientResponse,
out: &mut Vec<TrackEventRequest>,
) {
let (thread, model, initialization_mode) = match response {
ClientResponse::ThreadStart { response, .. } => (
response.thread,
response.model,
ThreadInitializationMode::New,
),
ClientResponse::ThreadResume { response, .. } => (
response.thread,
response.model,
ThreadInitializationMode::Resumed,
),
ClientResponse::ThreadFork { response, .. } => (
response.thread,
response.model,
ThreadInitializationMode::Forked,
),
_ => return,
};
match response {
ClientResponse::ThreadStart { response, .. } => {
self.emit_thread_initialized(
connection_id,
response.thread,
response.model,
ThreadInitializationMode::New,
out,
);
}
ClientResponse::ThreadResume { response, .. } => {
self.emit_thread_initialized(
connection_id,
response.thread,
response.model,
ThreadInitializationMode::Resumed,
out,
);
}
ClientResponse::ThreadFork { response, .. } => {
self.emit_thread_initialized(
connection_id,
response.thread,
response.model,
ThreadInitializationMode::Forked,
out,
);
}
ClientResponse::TurnStart {
request_id,
response,
} => {
let turn_id = response.turn.id;
let Some(RequestState::TurnStart(pending_request)) =
self.requests.remove(&(connection_id, request_id))
else {
return;
};
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
completed: None,
});
turn_state.connection_id = Some(connection_id);
turn_state.thread_id = Some(pending_request.thread_id);
turn_state.num_input_images = Some(pending_request.num_input_images);
self.maybe_emit_turn_event(&turn_id, out);
}
_ => {}
}
}
fn ingest_notification(
&mut self,
notification: ServerNotification,
out: &mut Vec<TrackEventRequest>,
) {
match notification {
ServerNotification::TurnStarted(notification) => {
let turn_state = self.turns.entry(notification.turn.id).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
completed: None,
});
turn_state.started_at = notification
.turn
.started_at
.and_then(|started_at| u64::try_from(started_at).ok());
}
ServerNotification::TurnCompleted(notification) => {
let turn_state =
self.turns
.entry(notification.turn.id.clone())
.or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
completed: None,
});
turn_state.completed = Some(CompletedTurnState {
status: analytics_turn_status(notification.turn.status),
turn_error: notification
.turn
.error
.and_then(|error| error.codex_error_info),
completed_at: notification
.turn
.completed_at
.and_then(|completed_at| u64::try_from(completed_at).ok())
.unwrap_or_default(),
duration_ms: notification
.turn
.duration_ms
.and_then(|duration_ms| u64::try_from(duration_ms).ok()),
});
let turn_id = notification.turn.id;
self.maybe_emit_turn_event(&turn_id, out);
}
_ => {}
}
}
fn emit_thread_initialized(
&mut self,
connection_id: u64,
thread: codex_app_server_protocol::Thread,
model: String,
initialization_mode: ThreadInitializationMode,
out: &mut Vec<TrackEventRequest>,
) {
let thread_source: SessionSource = thread.source.into();
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
@@ -275,6 +469,143 @@ impl AnalyticsReducer {
},
));
}
fn maybe_emit_turn_event(&mut self, turn_id: &str, out: &mut Vec<TrackEventRequest>) {
let Some(turn_state) = self.turns.get(turn_id) else {
return;
};
if turn_state.thread_id.is_none()
|| turn_state.num_input_images.is_none()
|| turn_state.resolved_config.is_none()
|| turn_state.completed.is_none()
{
return;
}
let product_client_id = turn_state
.connection_id
.and_then(|connection_id| self.connections.get(&connection_id))
.map(|connection_state| connection_state.app_server_client.product_client_id.clone())
.unwrap_or_else(|| originator().value);
out.push(TrackEventRequest::TurnEvent(Box::new(
CodexTurnEventRequest {
event_type: "codex_turn_event",
event_params: codex_turn_event_params(
product_client_id,
turn_id.to_string(),
turn_state,
),
},
)));
self.turns.remove(turn_id);
}
}
fn codex_turn_event_params(
product_client_id: String,
turn_id: String,
turn_state: &TurnState,
) -> CodexTurnEventParams {
let (Some(thread_id), Some(num_input_images), Some(resolved_config), Some(completed)) = (
turn_state.thread_id.clone(),
turn_state.num_input_images,
turn_state.resolved_config.clone(),
turn_state.completed.clone(),
) else {
unreachable!("turn event params require a fully populated turn state");
};
let started_at = turn_state.started_at;
let TurnResolvedConfigFact {
turn_id: _resolved_turn_id,
thread_id: _resolved_thread_id,
num_input_images: _resolved_num_input_images,
submission_type,
model,
model_provider,
sandbox_policy,
reasoning_effort,
reasoning_summary,
service_tier,
approval_policy,
approvals_reviewer,
sandbox_network_access,
collaboration_mode,
personality,
is_first_turn,
} = resolved_config;
CodexTurnEventParams {
thread_id,
turn_id,
product_client_id,
submission_type,
model: Some(model),
model_provider,
sandbox_policy: Some(sandbox_policy_mode(&sandbox_policy)),
reasoning_effort: reasoning_effort.map(|value| value.to_string()),
reasoning_summary: reasoning_summary_mode(reasoning_summary),
service_tier: service_tier
.map(|value| value.to_string())
.unwrap_or_else(|| "default".to_string()),
approval_policy: approval_policy.to_string(),
approvals_reviewer: approvals_reviewer.to_string(),
sandbox_network_access,
collaboration_mode: Some(collaboration_mode_mode(collaboration_mode)),
personality: personality_mode(personality),
num_input_images,
is_first_turn,
status: completed.status,
turn_error: completed.turn_error,
steer_count: None,
total_tool_call_count: None,
shell_command_count: None,
file_change_count: None,
mcp_tool_call_count: None,
dynamic_tool_call_count: None,
subagent_tool_call_count: None,
web_search_count: None,
image_generation_count: None,
duration_ms: completed.duration_ms,
started_at,
completed_at: Some(completed.completed_at),
}
}
fn sandbox_policy_mode(sandbox_policy: &SandboxPolicy) -> &'static str {
match sandbox_policy {
SandboxPolicy::DangerFullAccess => "full_access",
SandboxPolicy::ReadOnly { .. } => "read_only",
SandboxPolicy::WorkspaceWrite { .. } => "workspace_write",
SandboxPolicy::ExternalSandbox { .. } => "external_sandbox",
}
}
fn collaboration_mode_mode(mode: ModeKind) -> &'static str {
match mode {
ModeKind::Plan => "plan",
ModeKind::Default | ModeKind::PairProgramming | ModeKind::Execute => "default",
}
}
fn reasoning_summary_mode(summary: Option<ReasoningSummary>) -> Option<String> {
match summary {
Some(ReasoningSummary::None) | None => None,
Some(summary) => Some(summary.to_string()),
}
}
fn personality_mode(personality: Option<Personality>) -> Option<String> {
match personality {
Some(Personality::None) | None => None,
Some(personality) => Some(personality.to_string()),
}
}
fn analytics_turn_status(status: codex_app_server_protocol::TurnStatus) -> Option<TurnStatus> {
match status {
codex_app_server_protocol::TurnStatus::Completed => Some(TurnStatus::Completed),
codex_app_server_protocol::TurnStatus::Failed => Some(TurnStatus::Failed),
codex_app_server_protocol::TurnStatus::Interrupted => Some(TurnStatus::Interrupted),
codex_app_server_protocol::TurnStatus::InProgress => None,
}
}
pub(crate) fn skill_id_for_local_skill(