[codex-analytics] emit basic tool events from item lifecycle

This commit is contained in:
rhan-oai
2026-04-08 21:03:26 -07:00
parent 68d22f3347
commit c01eb26e94
6 changed files with 499 additions and 9 deletions

View File

@@ -39,12 +39,18 @@ use codex_app_server_protocol::ApprovalsReviewer as AppServerApprovalsReviewer;
use codex_app_server_protocol::AskForApproval as AppServerAskForApproval;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::CommandExecutionSource;
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
@@ -119,6 +125,50 @@ fn sample_thread_resume_response(thread_id: &str, ephemeral: bool, model: &str)
}
}
fn sample_initialize_fact(connection_id: u64) -> AnalyticsFact {
AnalyticsFact::Initialize {
connection_id,
params: InitializeParams {
client_info: ClientInfo {
name: "codex-tui".to_string(),
title: None,
version: "1.0.0".to_string(),
},
capabilities: Some(InitializeCapabilities {
experimental_api: false,
opt_out_notification_methods: None,
}),
},
product_client_id: DEFAULT_ORIGINATOR.to_string(),
runtime: CodexRuntimeMetadata {
codex_rs_version: "0.99.0".to_string(),
runtime_os: "linux".to_string(),
runtime_os_version: "24.04".to_string(),
runtime_arch: "x86_64".to_string(),
},
rpc_transport: AppServerRpcTransport::Websocket,
}
}
fn sample_command_execution_item(
status: CommandExecutionStatus,
exit_code: Option<i32>,
duration_ms: Option<i64>,
) -> ThreadItem {
ThreadItem::CommandExecution {
id: "tool-call-1".to_string(),
command: "echo hi".to_string(),
cwd: PathBuf::from("/tmp"),
process_id: None,
source: CommandExecutionSource::Agent,
status,
command_actions: Vec::new(),
aggregated_output: None,
exit_code,
duration_ms,
}
}
fn expected_absolute_path(path: &PathBuf) -> String {
std::fs::canonicalize(path)
.unwrap_or_else(|_| path.to_path_buf())
@@ -537,6 +587,75 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
assert_eq!(payload[0]["event_params"]["parent_thread_id"], json!(null));
}
#[tokio::test]
async fn item_lifecycle_notifications_publish_basic_tool_call_event() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
reducer
.ingest(sample_initialize_fact(/*connection_id*/ 7), &mut events)
.await;
reducer
.ingest(
AnalyticsFact::Notification {
connection_id: 7,
notification: Box::new(ServerNotification::ItemStarted(ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item: sample_command_execution_item(
CommandExecutionStatus::InProgress,
/*exit_code*/ None,
/*duration_ms*/ None,
),
})),
},
&mut events,
)
.await;
assert!(events.is_empty(), "tool event should emit on completion");
reducer
.ingest(
AnalyticsFact::Notification {
connection_id: 7,
notification: Box::new(ServerNotification::ItemCompleted(
ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item: sample_command_execution_item(
CommandExecutionStatus::Completed,
Some(0),
Some(42),
),
},
)),
},
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 1);
assert_eq!(payload[0]["event_type"], "codex_tool_call_event");
assert_eq!(payload[0]["event_params"]["thread_id"], "thread-1");
assert_eq!(payload[0]["event_params"]["turn_id"], "turn-1");
assert_eq!(payload[0]["event_params"]["tool_call_id"], "tool-call-1");
assert_eq!(payload[0]["event_params"]["tool_name"], "shell");
assert_eq!(payload[0]["event_params"]["tool_kind"], "shell");
assert_eq!(payload[0]["event_params"]["terminal_status"], "completed");
assert_eq!(
payload[0]["event_params"]["failure_kind"],
serde_json::Value::Null
);
assert_eq!(payload[0]["event_params"]["exit_code"], 0);
assert_eq!(payload[0]["event_params"]["duration_ms"], 42);
assert_eq!(payload[0]["event_params"]["execution_started"], true);
assert_eq!(
payload[0]["event_params"]["app_server_client"]["client_name"],
"codex-tui"
);
}
#[test]
fn subagent_thread_started_review_serializes_expected_shape() {
let event = TrackEventRequest::ThreadInitialized(subagent_thread_started_event_request(

View File

@@ -16,6 +16,7 @@ use crate::facts::TrackEventsContext;
use crate::reducer::AnalyticsReducer;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_login::AuthManager;
@@ -250,6 +251,13 @@ impl AnalyticsEventsClient {
response: Box::new(response),
});
}
pub fn track_notification(&self, connection_id: u64, notification: ServerNotification) {
self.record_fact(AnalyticsFact::Notification {
connection_id,
notification: Box::new(notification),
});
}
}
async fn send_track_events(

View File

@@ -37,7 +37,6 @@ pub(crate) enum TrackEventRequest {
ThreadInitialized(ThreadInitializedEvent),
AppMentioned(CodexAppMentionedEventRequest),
AppUsed(CodexAppUsedEventRequest),
#[allow(dead_code)]
ToolCall(CodexToolCallEventRequest),
PluginUsed(CodexPluginUsedEventRequest),
PluginInstalled(CodexPluginEventRequest),
@@ -103,7 +102,6 @@ pub(crate) struct ThreadInitializedEvent {
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
#[allow(dead_code)]
pub(crate) enum ToolKind {
Shell,
UnifiedExec,
@@ -115,37 +113,46 @@ pub(crate) enum ToolKind {
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
#[allow(dead_code)]
pub(crate) enum ToolCallFinalReviewOutcome {
NotNeeded,
#[allow(dead_code)]
GuardianApproved,
#[allow(dead_code)]
GuardianDenied,
#[allow(dead_code)]
GuardianAborted,
#[allow(dead_code)]
UserApproved,
#[allow(dead_code)]
UserApprovedForSession,
#[allow(dead_code)]
UserDenied,
#[allow(dead_code)]
UserAborted,
#[allow(dead_code)]
ConfigAllowed,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
#[allow(dead_code)]
pub(crate) enum ToolCallTerminalStatus {
Completed,
Failed,
Rejected,
#[allow(dead_code)]
Interrupted,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
#[allow(dead_code)]
pub(crate) enum ToolCallFailureKind {
ToolError,
ApprovalDenied,
#[allow(dead_code)]
ApprovalAborted,
#[allow(dead_code)]
SandboxDenied,
#[allow(dead_code)]
PolicyForbidden,
}

View File

@@ -90,7 +90,10 @@ pub(crate) enum AnalyticsFact {
ServerResponse {
response: Box<ServerResponse>,
},
Notification(Box<ServerNotification>),
Notification {
connection_id: u64,
notification: Box<ServerNotification>,
},
// Facts that do not naturally exist on the app-server protocol surface, or
// would require non-trivial protocol reshaping on this branch.
Custom(CustomAnalyticsFact),

View File

@@ -5,11 +5,17 @@ use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexRuntimeMetadata;
use crate::events::CodexToolCallEventParams;
use crate::events::CodexToolCallEventRequest;
use crate::events::SkillInvocationEventParams;
use crate::events::SkillInvocationEventRequest;
use crate::events::ThreadInitializationMode;
use crate::events::ThreadInitializedEvent;
use crate::events::ThreadInitializedEventParams;
use crate::events::ToolCallFailureKind;
use crate::events::ToolCallFinalReviewOutcome;
use crate::events::ToolCallTerminalStatus;
use crate::events::ToolKind;
use crate::events::TrackEventRequest;
use crate::events::codex_app_metadata;
use crate::events::codex_plugin_metadata;
@@ -27,7 +33,16 @@ use crate::facts::PluginUsedInput;
use crate::facts::SkillInvokedInput;
use crate::facts::SubAgentThreadStartedInput;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::CollabAgentTool;
use codex_app_server_protocol::CollabAgentToolCallStatus;
use codex_app_server_protocol::CommandExecutionSource;
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::DynamicToolCallStatus;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::McpToolCallStatus;
use codex_app_server_protocol::PatchApplyStatus;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadItem;
use codex_git_utils::collect_git_info;
use codex_git_utils::get_git_repo_root;
use codex_login::default_client::originator;
@@ -36,10 +51,13 @@ use codex_protocol::protocol::SkillScope;
use sha1::Digest;
use std::collections::HashMap;
use std::path::Path;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
#[derive(Default)]
pub(crate) struct AnalyticsReducer {
connections: HashMap<u64, ConnectionState>,
tool_calls: HashMap<String, ToolCallState>,
}
struct ConnectionState {
@@ -47,6 +65,21 @@ struct ConnectionState {
runtime: CodexRuntimeMetadata,
}
struct ToolCallState {
connection_id: u64,
started_at: u64,
}
struct ToolCallCompletedMetadata {
tool_call_id: String,
tool_name: String,
tool_kind: ToolKind,
terminal_status: ToolCallTerminalStatus,
failure_kind: Option<ToolCallFailureKind>,
exit_code: Option<i32>,
duration_ms: Option<u64>,
}
impl AnalyticsReducer {
pub(crate) async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec<TrackEventRequest>) {
match input {
@@ -83,7 +116,12 @@ impl AnalyticsReducer {
AnalyticsFact::ServerResponse {
response: _response,
} => {}
AnalyticsFact::Notification(_notification) => {}
AnalyticsFact::Notification {
connection_id,
notification,
} => {
self.ingest_notification(connection_id, *notification, out);
}
AnalyticsFact::Custom(input) => match input {
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
self.ingest_subagent_thread_started(input, out);
@@ -282,6 +320,315 @@ impl AnalyticsReducer {
},
));
}
fn ingest_notification(
&mut self,
connection_id: u64,
notification: ServerNotification,
out: &mut Vec<TrackEventRequest>,
) {
match notification {
ServerNotification::ItemStarted(notification) => {
if let Some(tool_call_id) = tool_call_id(&notification.item) {
self.tool_calls
.entry(tool_call_id.to_string())
.or_insert_with(|| ToolCallState {
connection_id,
started_at: now_unix_secs(),
});
}
}
ServerNotification::ItemCompleted(notification) => {
let Some(completed_metadata) = tool_call_completed_metadata(&notification.item)
else {
return;
};
let Some(started) = self.tool_calls.remove(&completed_metadata.tool_call_id) else {
return;
};
let Some(connection_state) = self.connections.get(&started.connection_id) else {
return;
};
let completed_at = now_unix_secs();
let duration_ms = completed_metadata.duration_ms.or_else(|| {
completed_at
.checked_sub(started.started_at)
.map(|duration_secs| duration_secs.saturating_mul(1000))
});
out.push(TrackEventRequest::ToolCall(CodexToolCallEventRequest {
event_type: "codex_tool_call_event",
event_params: CodexToolCallEventParams {
thread_id: notification.thread_id,
turn_id: notification.turn_id,
tool_call_id: completed_metadata.tool_call_id,
app_server_client: connection_state.app_server_client.clone(),
runtime: connection_state.runtime.clone(),
tool_name: completed_metadata.tool_name,
tool_kind: completed_metadata.tool_kind,
started_at: started.started_at,
completed_at: Some(completed_at),
duration_ms,
execution_started: true,
review_count: 0,
guardian_review_count: 0,
user_review_count: 0,
final_review_outcome: ToolCallFinalReviewOutcome::NotNeeded,
terminal_status: completed_metadata.terminal_status,
failure_kind: completed_metadata.failure_kind,
exit_code: completed_metadata.exit_code,
requested_additional_permissions: false,
requested_network_access: false,
retry_count: 0,
},
}));
}
_ => {}
}
}
}
fn now_unix_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs())
.unwrap_or_default()
}
fn tool_call_id(item: &ThreadItem) -> Option<&str> {
match item {
ThreadItem::CommandExecution { id, .. }
| ThreadItem::FileChange { id, .. }
| ThreadItem::McpToolCall { id, .. }
| ThreadItem::DynamicToolCall { id, .. }
| ThreadItem::CollabAgentToolCall { id, .. }
| ThreadItem::WebSearch { id, .. }
| ThreadItem::ImageGeneration { id, .. } => Some(id),
_ => None,
}
}
fn tool_call_completed_metadata(item: &ThreadItem) -> Option<ToolCallCompletedMetadata> {
match item {
ThreadItem::CommandExecution {
id,
source,
status,
exit_code,
duration_ms,
..
} => {
let (terminal_status, failure_kind) = command_execution_outcome(status)?;
let tool_kind = command_execution_tool_kind(*source);
Some(ToolCallCompletedMetadata {
tool_call_id: id.clone(),
tool_name: command_execution_tool_name(*source).to_string(),
tool_kind,
terminal_status,
failure_kind,
exit_code: *exit_code,
duration_ms: option_i64_to_u64(*duration_ms),
})
}
ThreadItem::FileChange { id, status, .. } => {
let (terminal_status, failure_kind) = patch_apply_outcome(status)?;
Some(ToolCallCompletedMetadata {
tool_call_id: id.clone(),
tool_name: "apply_patch".to_string(),
tool_kind: ToolKind::ApplyPatch,
terminal_status,
failure_kind,
exit_code: None,
duration_ms: None,
})
}
ThreadItem::McpToolCall {
id,
tool,
status,
duration_ms,
..
} => {
let (terminal_status, failure_kind) = mcp_tool_call_outcome(status)?;
Some(ToolCallCompletedMetadata {
tool_call_id: id.clone(),
tool_name: tool.clone(),
tool_kind: ToolKind::Mcp,
terminal_status,
failure_kind,
exit_code: None,
duration_ms: option_i64_to_u64(*duration_ms),
})
}
ThreadItem::DynamicToolCall {
id,
tool,
status,
duration_ms,
..
} => {
let (terminal_status, failure_kind) = dynamic_tool_call_outcome(status)?;
Some(ToolCallCompletedMetadata {
tool_call_id: id.clone(),
tool_name: tool.clone(),
tool_kind: ToolKind::Dynamic,
terminal_status,
failure_kind,
exit_code: None,
duration_ms: option_i64_to_u64(*duration_ms),
})
}
ThreadItem::CollabAgentToolCall {
id, tool, status, ..
} => {
let (terminal_status, failure_kind) = collab_tool_call_outcome(status)?;
Some(ToolCallCompletedMetadata {
tool_call_id: id.clone(),
tool_name: collab_agent_tool_name(tool).to_string(),
tool_kind: ToolKind::Other,
terminal_status,
failure_kind,
exit_code: None,
duration_ms: None,
})
}
ThreadItem::WebSearch { id, .. } => Some(ToolCallCompletedMetadata {
tool_call_id: id.clone(),
tool_name: "web_search".to_string(),
tool_kind: ToolKind::Other,
terminal_status: ToolCallTerminalStatus::Completed,
failure_kind: None,
exit_code: None,
duration_ms: None,
}),
ThreadItem::ImageGeneration { id, status, .. } => {
let (terminal_status, failure_kind) = image_generation_outcome(status.as_str());
Some(ToolCallCompletedMetadata {
tool_call_id: id.clone(),
tool_name: "image_generation".to_string(),
tool_kind: ToolKind::Other,
terminal_status,
failure_kind,
exit_code: None,
duration_ms: None,
})
}
_ => None,
}
}
fn command_execution_tool_kind(source: CommandExecutionSource) -> ToolKind {
match source {
CommandExecutionSource::UnifiedExecStartup
| CommandExecutionSource::UnifiedExecInteraction => ToolKind::UnifiedExec,
CommandExecutionSource::Agent | CommandExecutionSource::UserShell => ToolKind::Shell,
}
}
fn command_execution_tool_name(source: CommandExecutionSource) -> &'static str {
match source {
CommandExecutionSource::UnifiedExecStartup
| CommandExecutionSource::UnifiedExecInteraction => "unified_exec",
CommandExecutionSource::UserShell => "user_shell",
CommandExecutionSource::Agent => "shell",
}
}
fn command_execution_outcome(
status: &CommandExecutionStatus,
) -> Option<(ToolCallTerminalStatus, Option<ToolCallFailureKind>)> {
match status {
CommandExecutionStatus::InProgress => None,
CommandExecutionStatus::Completed => Some((ToolCallTerminalStatus::Completed, None)),
CommandExecutionStatus::Failed => Some((
ToolCallTerminalStatus::Failed,
Some(ToolCallFailureKind::ToolError),
)),
CommandExecutionStatus::Declined => Some((
ToolCallTerminalStatus::Rejected,
Some(ToolCallFailureKind::ApprovalDenied),
)),
}
}
fn patch_apply_outcome(
status: &PatchApplyStatus,
) -> Option<(ToolCallTerminalStatus, Option<ToolCallFailureKind>)> {
match status {
PatchApplyStatus::InProgress => None,
PatchApplyStatus::Completed => Some((ToolCallTerminalStatus::Completed, None)),
PatchApplyStatus::Failed => Some((
ToolCallTerminalStatus::Failed,
Some(ToolCallFailureKind::ToolError),
)),
PatchApplyStatus::Declined => Some((
ToolCallTerminalStatus::Rejected,
Some(ToolCallFailureKind::ApprovalDenied),
)),
}
}
fn mcp_tool_call_outcome(
status: &McpToolCallStatus,
) -> Option<(ToolCallTerminalStatus, Option<ToolCallFailureKind>)> {
match status {
McpToolCallStatus::InProgress => None,
McpToolCallStatus::Completed => Some((ToolCallTerminalStatus::Completed, None)),
McpToolCallStatus::Failed => Some((
ToolCallTerminalStatus::Failed,
Some(ToolCallFailureKind::ToolError),
)),
}
}
fn dynamic_tool_call_outcome(
status: &DynamicToolCallStatus,
) -> Option<(ToolCallTerminalStatus, Option<ToolCallFailureKind>)> {
match status {
DynamicToolCallStatus::InProgress => None,
DynamicToolCallStatus::Completed => Some((ToolCallTerminalStatus::Completed, None)),
DynamicToolCallStatus::Failed => Some((
ToolCallTerminalStatus::Failed,
Some(ToolCallFailureKind::ToolError),
)),
}
}
fn collab_tool_call_outcome(
status: &CollabAgentToolCallStatus,
) -> Option<(ToolCallTerminalStatus, Option<ToolCallFailureKind>)> {
match status {
CollabAgentToolCallStatus::InProgress => None,
CollabAgentToolCallStatus::Completed => Some((ToolCallTerminalStatus::Completed, None)),
CollabAgentToolCallStatus::Failed => Some((
ToolCallTerminalStatus::Failed,
Some(ToolCallFailureKind::ToolError),
)),
}
}
fn image_generation_outcome(status: &str) -> (ToolCallTerminalStatus, Option<ToolCallFailureKind>) {
match status {
"failed" | "error" => (
ToolCallTerminalStatus::Failed,
Some(ToolCallFailureKind::ToolError),
),
_ => (ToolCallTerminalStatus::Completed, None),
}
}
fn collab_agent_tool_name(tool: &CollabAgentTool) -> &'static str {
match tool {
CollabAgentTool::SpawnAgent => "spawn_agent",
CollabAgentTool::SendInput => "send_input",
CollabAgentTool::ResumeAgent => "resume_agent",
CollabAgentTool::Wait => "wait_agent",
CollabAgentTool::CloseAgent => "close_agent",
}
}
fn option_i64_to_u64(value: Option<i64>) -> Option<u64> {
value.and_then(|value| u64::try_from(value).ok())
}
pub(crate) fn skill_id_for_local_skill(

View File

@@ -537,7 +537,7 @@ impl OutgoingMessageSender {
targeted_connections = connection_ids.len(),
"app-server event: {notification}"
);
let outgoing_message = OutgoingMessage::AppServerNotification(notification);
let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone());
if connection_ids.is_empty() {
if let Err(err) = self
.sender
@@ -561,6 +561,9 @@ impl OutgoingMessageSender {
.await
{
warn!("failed to send server notification to client: {err:?}");
} else {
self.analytics_events_client
.track_notification(connection_id.0, notification.clone());
}
}
}
@@ -571,7 +574,7 @@ impl OutgoingMessageSender {
notification: ServerNotification,
) {
tracing::trace!("app-server event: {notification}");
let outgoing_message = OutgoingMessage::AppServerNotification(notification);
let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone());
let (write_complete_tx, write_complete_rx) = oneshot::channel();
if let Err(err) = self
.sender
@@ -583,6 +586,9 @@ impl OutgoingMessageSender {
.await
{
warn!("failed to send server notification to client: {err:?}");
} else {
self.analytics_events_client
.track_notification(connection_id.0, notification);
}
let _ = write_complete_rx.await;
}