Compare commits

...

4 Commits

Author SHA1 Message Date
rhan-oai
3081e2ff35 [codex-analytics] emit terminal tool review events 2026-05-04 16:51:31 -07:00
rhan-oai
b978e3757a [codex-analytics] add tool review event schema 2026-05-04 16:51:31 -07:00
rhan-oai
25fcab2575 [codex-analytics] emit tool item events from item lifecycle 2026-05-04 16:51:31 -07:00
rhan-oai
06f7701777 [codex-analytics] add tool item event schemas 2026-05-04 16:51:31 -07:00
8 changed files with 2448 additions and 57 deletions

View File

@@ -3,12 +3,19 @@ use crate::events::AppServerRpcTransport;
use crate::events::CodexAppMentionedEventRequest;
use crate::events::CodexAppServerClientMetadata;
use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexCommandExecutionEventParams;
use crate::events::CodexCommandExecutionEventRequest;
use crate::events::CodexCompactionEventRequest;
use crate::events::CodexHookRunEventRequest;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexRuntimeMetadata;
use crate::events::CodexToolCallReviewEventParams;
use crate::events::CodexToolCallReviewEventRequest;
use crate::events::CodexToolItemEventBase;
use crate::events::CodexTurnEventRequest;
use crate::events::CommandExecutionFamily;
use crate::events::CommandExecutionSource;
use crate::events::GuardianApprovalRequestSource;
use crate::events::GuardianReviewDecision;
use crate::events::GuardianReviewEventParams;
@@ -17,6 +24,12 @@ use crate::events::GuardianReviewTerminalStatus;
use crate::events::GuardianReviewedAction;
use crate::events::ThreadInitializedEvent;
use crate::events::ThreadInitializedEventParams;
use crate::events::ToolItemFinalApprovalOutcome;
use crate::events::ToolItemTerminalStatus;
use crate::events::ToolReviewReviewer;
use crate::events::ToolReviewStatus;
use crate::events::ToolReviewToolKind;
use crate::events::ToolReviewTrigger;
use crate::events::TrackEventRequest;
use crate::events::codex_app_metadata;
use crate::events::codex_hook_run_metadata;
@@ -55,23 +68,50 @@ use crate::facts::TurnTokenUsageFact;
use crate::reducer::AnalyticsReducer;
use crate::reducer::normalize_path_for_skill_id;
use crate::reducer::skill_id_for_local_skill;
use codex_app_server_protocol::AdditionalNetworkPermissions;
use codex_app_server_protocol::ApprovalsReviewer as AppServerApprovalsReviewer;
use codex_app_server_protocol::AskForApproval as AppServerAskForApproval;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::CommandExecutionSource as AppServerCommandExecutionSource;
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::FileChangeApprovalDecision;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::GrantedPermissionProfile;
use codex_app_server_protocol::GuardianApprovalReview;
use codex_app_server_protocol::GuardianApprovalReviewAction;
use codex_app_server_protocol::GuardianApprovalReviewStatus;
use codex_app_server_protocol::GuardianCommandSource as AppServerGuardianCommandSource;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemGuardianApprovalReviewCompletedNotification;
use codex_app_server_protocol::ItemGuardianApprovalReviewStartedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::NetworkPolicyAmendment;
use codex_app_server_protocol::NetworkPolicyRuleAction;
use codex_app_server_protocol::NonSteerableTurnKind;
use codex_app_server_protocol::PermissionGrantScope;
use codex_app_server_protocol::PermissionsRequestApprovalParams;
use codex_app_server_protocol::PermissionsRequestApprovalResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::RequestPermissionProfile;
use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
@@ -581,6 +621,237 @@ async fn ingest_turn_prerequisites(
}
}
async fn ingest_tool_review_prerequisites(
reducer: &mut AnalyticsReducer,
events: &mut Vec<TrackEventRequest>,
) {
reducer
.ingest(sample_initialize_fact(/*connection_id*/ 7), events)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(1),
response: Box::new(sample_thread_start_response(
"thread-1", /*ephemeral*/ false, "gpt-5",
)),
},
events,
)
.await;
events.clear();
}
fn sample_initialize_fact(connection_id: u64) -> AnalyticsFact {
AnalyticsFact::Initialize {
connection_id,
params: InitializeParams {
client_info: ClientInfo {
name: "codex-tui".to_string(),
title: None,
version: "1.0.0".to_string(),
},
capabilities: Some(InitializeCapabilities {
experimental_api: false,
opt_out_notification_methods: None,
}),
},
product_client_id: DEFAULT_ORIGINATOR.to_string(),
runtime: CodexRuntimeMetadata {
codex_rs_version: "0.99.0".to_string(),
runtime_os: "linux".to_string(),
runtime_os_version: "24.04".to_string(),
runtime_arch: "x86_64".to_string(),
},
rpc_transport: AppServerRpcTransport::Websocket,
}
}
fn sample_command_execution_item(
status: CommandExecutionStatus,
exit_code: Option<i32>,
duration_ms: Option<i64>,
) -> ThreadItem {
ThreadItem::CommandExecution {
id: "item-1".to_string(),
command: "echo hi".to_string(),
cwd: test_path_buf("/tmp").abs(),
process_id: Some("pid-1".to_string()),
source: AppServerCommandExecutionSource::Agent,
status,
command_actions: Vec::new(),
aggregated_output: None,
exit_code,
duration_ms,
}
}
fn sample_command_approval_request(request_id: i64, approval_id: Option<&str>) -> ServerRequest {
ServerRequest::CommandExecutionRequestApproval {
request_id: RequestId::Integer(request_id),
params: CommandExecutionRequestApprovalParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "item-1".to_string(),
approval_id: approval_id.map(str::to_string),
reason: None,
network_approval_context: None,
command: Some("echo hi".to_string()),
cwd: None,
command_actions: None,
additional_permissions: None,
proposed_execpolicy_amendment: None,
proposed_network_policy_amendments: None,
available_decisions: None,
},
}
}
fn sample_command_approval_response(
request_id: i64,
decision: CommandExecutionApprovalDecision,
) -> ServerResponse {
ServerResponse::CommandExecutionRequestApproval {
request_id: RequestId::Integer(request_id),
response: CommandExecutionRequestApprovalResponse { decision },
}
}
fn sample_network_policy_command_approval_request(request_id: i64) -> ServerRequest {
ServerRequest::CommandExecutionRequestApproval {
request_id: RequestId::Integer(request_id),
params: CommandExecutionRequestApprovalParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "item-1".to_string(),
approval_id: None,
reason: Some("network access requested".to_string()),
network_approval_context: None,
command: None,
cwd: None,
command_actions: None,
additional_permissions: None,
proposed_execpolicy_amendment: None,
proposed_network_policy_amendments: Some(vec![NetworkPolicyAmendment {
host: "example.com".to_string(),
action: NetworkPolicyRuleAction::Allow,
}]),
available_decisions: None,
},
}
}
fn sample_file_change_approval_request(request_id: i64) -> ServerRequest {
ServerRequest::FileChangeRequestApproval {
request_id: RequestId::Integer(request_id),
params: FileChangeRequestApprovalParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "item-1".to_string(),
reason: None,
grant_root: Some(PathBuf::from("/tmp")),
},
}
}
fn sample_file_change_approval_response(
request_id: i64,
decision: FileChangeApprovalDecision,
) -> ServerResponse {
ServerResponse::FileChangeRequestApproval {
request_id: RequestId::Integer(request_id),
response: FileChangeRequestApprovalResponse { decision },
}
}
fn sample_permissions_approval_request(request_id: i64) -> ServerRequest {
ServerRequest::PermissionsRequestApproval {
request_id: RequestId::Integer(request_id),
params: PermissionsRequestApprovalParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "item-1".to_string(),
cwd: test_path_buf("/tmp").abs(),
reason: None,
permissions: RequestPermissionProfile {
network: Some(AdditionalNetworkPermissions {
enabled: Some(true),
}),
file_system: None,
},
},
}
}
fn sample_permissions_approval_response(request_id: i64) -> ServerResponse {
ServerResponse::PermissionsRequestApproval {
request_id: RequestId::Integer(request_id),
response: PermissionsRequestApprovalResponse {
permissions: GrantedPermissionProfile {
network: Some(AdditionalNetworkPermissions {
enabled: Some(true),
}),
file_system: None,
},
scope: PermissionGrantScope::Session,
strict_auto_review: None,
},
}
}
fn sample_guardian_review_started(
review_id: &str,
target_item_id: Option<&str>,
) -> ServerNotification {
ServerNotification::ItemGuardianApprovalReviewStarted(
ItemGuardianApprovalReviewStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
review_id: review_id.to_string(),
target_item_id: target_item_id.map(str::to_string),
review: GuardianApprovalReview {
status: GuardianApprovalReviewStatus::InProgress,
risk_level: None,
user_authorization: None,
rationale: None,
},
action: GuardianApprovalReviewAction::Command {
source: AppServerGuardianCommandSource::Shell,
command: "echo hi".to_string(),
cwd: test_path_buf("/tmp").abs(),
},
},
)
}
fn sample_guardian_review_completed(
review_id: &str,
target_item_id: Option<&str>,
status: GuardianApprovalReviewStatus,
) -> ServerNotification {
ServerNotification::ItemGuardianApprovalReviewCompleted(
ItemGuardianApprovalReviewCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
review_id: review_id.to_string(),
target_item_id: target_item_id.map(str::to_string),
decision_source: codex_app_server_protocol::AutoReviewDecisionSource::Agent,
review: GuardianApprovalReview {
status,
risk_level: None,
user_authorization: None,
rationale: None,
},
action: GuardianApprovalReviewAction::Command {
source: AppServerGuardianCommandSource::Shell,
command: "echo hi".to_string(),
cwd: test_path_buf("/tmp").abs(),
},
},
)
}
fn expected_absolute_path(path: &PathBuf) -> String {
std::fs::canonicalize(path)
.unwrap_or_else(|_| path.to_path_buf())
@@ -884,6 +1155,145 @@ fn thread_initialized_event_serializes_expected_shape() {
);
}
#[test]
fn command_execution_event_serializes_expected_shape() {
let event = TrackEventRequest::CommandExecution(CodexCommandExecutionEventRequest {
event_type: "codex_command_execution_event",
event_params: CodexCommandExecutionEventParams {
base: CodexToolItemEventBase {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "item-1".to_string(),
app_server_client: CodexAppServerClientMetadata {
product_client_id: "codex_tui".to_string(),
client_name: Some("codex-tui".to_string()),
client_version: Some("1.2.3".to_string()),
rpc_transport: AppServerRpcTransport::Websocket,
experimental_api_enabled: Some(true),
},
runtime: CodexRuntimeMetadata {
codex_rs_version: "0.99.0".to_string(),
runtime_os: "macos".to_string(),
runtime_os_version: "15.3.1".to_string(),
runtime_arch: "aarch64".to_string(),
},
thread_source: Some("user"),
subagent_source: None,
parent_thread_id: None,
tool_name: "shell".to_string(),
started_at_ms: 123_000,
completed_at_ms: 125_000,
duration_ms: Some(2000),
review_count: 0,
guardian_review_count: 0,
user_review_count: 0,
final_approval_outcome: ToolItemFinalApprovalOutcome::NotNeeded,
terminal_status: ToolItemTerminalStatus::Completed,
failure_kind: None,
requested_additional_permissions: false,
requested_network_access: false,
},
command_execution_source: CommandExecutionSource::Agent,
command_execution_family: CommandExecutionFamily::Shell,
exit_code: Some(0),
command_action_count: Some(1),
},
});
let payload = serde_json::to_value(&event).expect("serialize command execution event");
assert_eq!(
payload,
json!({
"event_type": "codex_command_execution_event",
"event_params": {
"thread_id": "thread-1",
"turn_id": "turn-1",
"item_id": "item-1",
"app_server_client": {
"product_client_id": "codex_tui",
"client_name": "codex-tui",
"client_version": "1.2.3",
"rpc_transport": "websocket",
"experimental_api_enabled": true
},
"runtime": {
"codex_rs_version": "0.99.0",
"runtime_os": "macos",
"runtime_os_version": "15.3.1",
"runtime_arch": "aarch64"
},
"thread_source": "user",
"subagent_source": null,
"parent_thread_id": null,
"tool_name": "shell",
"started_at_ms": 123000,
"completed_at_ms": 125000,
"duration_ms": 2000,
"review_count": 0,
"guardian_review_count": 0,
"user_review_count": 0,
"final_approval_outcome": "not_needed",
"terminal_status": "completed",
"failure_kind": null,
"requested_additional_permissions": false,
"requested_network_access": false,
"command_execution_source": "agent",
"command_execution_family": "shell",
"exit_code": 0,
"command_action_count": 1
}
})
);
}
#[test]
fn tool_call_review_event_serializes_expected_shape() {
let event = TrackEventRequest::ToolCallReview(CodexToolCallReviewEventRequest {
event_type: "codex_tool_call_review_event",
event_params: CodexToolCallReviewEventParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: None,
review_id: "review-1".to_string(),
thread_source: Some("subagent"),
subagent_source: Some("thread_spawn".to_string()),
parent_thread_id: Some("parent-thread-1".to_string()),
tool_kind: ToolReviewToolKind::NetworkAccess,
tool_name: "network_access".to_string(),
reviewer: ToolReviewReviewer::User,
trigger: ToolReviewTrigger::NetworkRetry,
status: ToolReviewStatus::NetworkPolicyAllow,
created_at: 123,
completed_at: Some(125),
duration_ms: Some(2000),
},
});
let payload = serde_json::to_value(&event).expect("serialize tool review event");
assert_eq!(
payload,
json!({
"event_type": "codex_tool_call_review_event",
"event_params": {
"thread_id": "thread-1",
"turn_id": "turn-1",
"item_id": null,
"review_id": "review-1",
"thread_source": "subagent",
"subagent_source": "thread_spawn",
"parent_thread_id": "parent-thread-1",
"tool_kind": "network_access",
"tool_name": "network_access",
"reviewer": "user",
"trigger": "network_retry",
"status": "network_policy_allow",
"created_at": 123,
"completed_at": 125,
"duration_ms": 2000
}
})
);
}
#[tokio::test]
async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialized() {
let mut reducer = AnalyticsReducer::default();
@@ -1289,6 +1699,416 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() {
assert_eq!(payload[0]["event_params"]["review_timeout_ms"], 90_000);
}
#[tokio::test]
async fn item_lifecycle_notifications_publish_command_execution_event() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_tool_review_prerequisites(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(ServerNotification::ItemStarted(
ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: 1_000,
item: sample_command_execution_item(
CommandExecutionStatus::InProgress,
/*exit_code*/ None,
/*duration_ms*/ None,
),
},
))),
&mut events,
)
.await;
assert!(
events.is_empty(),
"tool item event should emit on completion"
);
reducer
.ingest(
AnalyticsFact::Notification(Box::new(ServerNotification::ItemCompleted(
ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: 1_042,
item: sample_command_execution_item(
CommandExecutionStatus::Completed,
Some(0),
Some(42),
),
},
))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 1);
assert_eq!(payload[0]["event_type"], "codex_command_execution_event");
assert_eq!(payload[0]["event_params"]["thread_id"], "thread-1");
assert_eq!(payload[0]["event_params"]["turn_id"], "turn-1");
assert_eq!(payload[0]["event_params"]["item_id"], "item-1");
assert_eq!(payload[0]["event_params"]["tool_name"], "shell");
assert_eq!(
payload[0]["event_params"]["command_execution_source"],
"agent"
);
assert_eq!(
payload[0]["event_params"]["command_execution_family"],
"shell"
);
assert_eq!(payload[0]["event_params"]["terminal_status"], "completed");
assert_eq!(
payload[0]["event_params"]["final_approval_outcome"],
"unknown"
);
assert_eq!(
payload[0]["event_params"]["failure_kind"],
serde_json::Value::Null
);
assert_eq!(payload[0]["event_params"]["exit_code"], 0);
assert_eq!(payload[0]["event_params"]["started_at_ms"], 1_000);
assert_eq!(payload[0]["event_params"]["completed_at_ms"], 1_042);
assert_eq!(payload[0]["event_params"]["duration_ms"], 42);
assert_eq!(
payload[0]["event_params"]["app_server_client"]["client_name"],
"codex-tui"
);
assert_eq!(payload[0]["event_params"]["thread_source"], "user");
}
#[tokio::test]
async fn command_execution_approval_response_publishes_user_review_event() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_tool_review_prerequisites(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::ServerRequest {
connection_id: 7,
request: Box::new(sample_command_approval_request(
/*request_id*/ 41, /*approval_id*/ None,
)),
},
&mut events,
)
.await;
assert!(events.is_empty());
reducer
.ingest(
AnalyticsFact::ServerResponse {
response: Box::new(sample_command_approval_response(
/*request_id*/ 41,
CommandExecutionApprovalDecision::Accept,
)),
},
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 1);
assert_eq!(payload[0]["event_type"], "codex_tool_call_review_event");
assert_eq!(payload[0]["event_params"]["thread_id"], "thread-1");
assert_eq!(payload[0]["event_params"]["turn_id"], "turn-1");
assert_eq!(payload[0]["event_params"]["item_id"], "item-1");
assert_eq!(payload[0]["event_params"]["review_id"], "user:41");
assert_eq!(payload[0]["event_params"]["thread_source"], "user");
assert_eq!(payload[0]["event_params"]["tool_kind"], "command_execution");
assert_eq!(payload[0]["event_params"]["tool_name"], "shell");
assert_eq!(payload[0]["event_params"]["reviewer"], "user");
assert_eq!(payload[0]["event_params"]["trigger"], "initial");
assert_eq!(payload[0]["event_params"]["status"], "approved");
assert!(payload[0]["event_params"]["created_at"].as_u64().is_some());
assert!(
payload[0]["event_params"]["completed_at"]
.as_u64()
.is_some()
);
assert!(payload[0]["event_params"]["duration_ms"].as_u64().is_some());
}
#[tokio::test]
async fn command_subapproval_maps_to_subcommand_execve_trigger() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_tool_review_prerequisites(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::ServerRequest {
connection_id: 7,
request: Box::new(sample_command_approval_request(
/*request_id*/ 42,
Some("approval-1"),
)),
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::ServerResponse {
response: Box::new(sample_command_approval_response(
/*request_id*/ 42,
CommandExecutionApprovalDecision::AcceptForSession,
)),
},
&mut events,
)
.await;
let payload = serde_json::to_value(&events[0]).expect("serialize review event");
assert_eq!(payload["event_params"]["trigger"], "subcommand_execve");
assert_eq!(payload["event_params"]["status"], "approved_for_session");
}
#[tokio::test]
async fn network_policy_amendment_maps_to_network_review_status() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_tool_review_prerequisites(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::ServerRequest {
connection_id: 7,
request: Box::new(sample_network_policy_command_approval_request(
/*request_id*/ 43,
)),
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::ServerResponse {
response: Box::new(sample_command_approval_response(
/*request_id*/ 43,
CommandExecutionApprovalDecision::ApplyNetworkPolicyAmendment {
network_policy_amendment: NetworkPolicyAmendment {
host: "example.com".to_string(),
action: NetworkPolicyRuleAction::Allow,
},
},
)),
},
&mut events,
)
.await;
let payload = serde_json::to_value(&events[0]).expect("serialize review event");
assert_eq!(payload["event_params"]["trigger"], "network_retry");
assert_eq!(payload["event_params"]["status"], "network_policy_allow");
}
#[tokio::test]
async fn file_change_approval_response_maps_terminal_statuses() {
for (request_id, decision, expected_status) in [
(51, FileChangeApprovalDecision::Accept, "approved"),
(52, FileChangeApprovalDecision::Decline, "denied"),
(53, FileChangeApprovalDecision::Cancel, "aborted"),
] {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_tool_review_prerequisites(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::ServerRequest {
connection_id: 7,
request: Box::new(sample_file_change_approval_request(request_id)),
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::ServerResponse {
response: Box::new(sample_file_change_approval_response(request_id, decision)),
},
&mut events,
)
.await;
let payload = serde_json::to_value(&events[0]).expect("serialize review event");
assert_eq!(payload["event_params"]["tool_kind"], "file_change");
assert_eq!(payload["event_params"]["tool_name"], "apply_patch");
assert_eq!(payload["event_params"]["trigger"], "sandbox_retry");
assert_eq!(payload["event_params"]["status"], expected_status);
}
}
#[tokio::test]
async fn permissions_approval_response_maps_to_network_trigger() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_tool_review_prerequisites(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::ServerRequest {
connection_id: 7,
request: Box::new(sample_permissions_approval_request(/*request_id*/ 61)),
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::ServerResponse {
response: Box::new(sample_permissions_approval_response(/*request_id*/ 61)),
},
&mut events,
)
.await;
let payload = serde_json::to_value(&events[0]).expect("serialize review event");
assert_eq!(payload["event_params"]["tool_kind"], "permissions");
assert_eq!(payload["event_params"]["trigger"], "network_retry");
assert_eq!(payload["event_params"]["status"], "approved_for_session");
}
#[tokio::test]
async fn guardian_completed_notification_publishes_review_event_with_thread_metadata() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_tool_review_prerequisites(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(sample_guardian_review_started(
"guardian-review-1",
Some("item-1"),
))),
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(sample_guardian_review_completed(
"guardian-review-1",
Some("item-1"),
GuardianApprovalReviewStatus::Denied,
))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events[0]).expect("serialize review event");
assert_eq!(payload["event_type"], "codex_tool_call_review_event");
assert_eq!(payload["event_params"]["review_id"], "guardian-review-1");
assert_eq!(payload["event_params"]["item_id"], "item-1");
assert_eq!(payload["event_params"]["thread_source"], "user");
assert_eq!(payload["event_params"]["tool_kind"], "command_execution");
assert_eq!(payload["event_params"]["reviewer"], "guardian");
assert_eq!(payload["event_params"]["status"], "denied");
assert!(payload["event_params"]["duration_ms"].as_u64().is_some());
}
#[tokio::test]
async fn guardian_completed_without_started_uses_null_duration() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_tool_review_prerequisites(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(sample_guardian_review_completed(
"guardian-review-2",
/*target_item_id*/ None,
GuardianApprovalReviewStatus::TimedOut,
))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events[0]).expect("serialize review event");
assert_eq!(payload["event_params"]["item_id"], json!(null));
assert_eq!(payload["event_params"]["status"], "timed_out");
assert_eq!(payload["event_params"]["duration_ms"], json!(null));
}
#[tokio::test]
async fn terminal_reviews_denormalize_counts_onto_tool_item_events() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_tool_review_prerequisites(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::ServerRequest {
connection_id: 7,
request: Box::new(sample_command_approval_request(
/*request_id*/ 71, /*approval_id*/ None,
)),
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::ServerResponse {
response: Box::new(sample_command_approval_response(
/*request_id*/ 71,
CommandExecutionApprovalDecision::AcceptForSession,
)),
},
&mut events,
)
.await;
events.clear();
reducer
.ingest(
AnalyticsFact::Notification(Box::new(ServerNotification::ItemStarted(
ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: 1_000,
item: sample_command_execution_item(
CommandExecutionStatus::InProgress,
/*exit_code*/ None,
/*duration_ms*/ None,
),
},
))),
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(ServerNotification::ItemCompleted(
ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: 1_042,
item: sample_command_execution_item(
CommandExecutionStatus::Completed,
Some(0),
Some(42),
),
},
))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events[0]).expect("serialize tool item event");
assert_eq!(payload["event_params"]["review_count"], 1);
assert_eq!(payload["event_params"]["user_review_count"], 1);
assert_eq!(payload["event_params"]["guardian_review_count"], 0);
assert_eq!(
payload["event_params"]["final_approval_outcome"],
"user_approved_for_session"
);
}
#[test]
fn subagent_thread_started_review_serializes_expected_shape() {
let event = TrackEventRequest::ThreadInitialized(subagent_thread_started_event_request(
@@ -1572,6 +2392,79 @@ async fn subagent_thread_started_inherits_parent_connection_for_new_thread() {
);
}
#[tokio::test]
async fn subagent_tool_items_inherit_parent_connection_metadata() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_tool_review_prerequisites(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted(
SubAgentThreadStartedInput {
thread_id: "thread-subagent".to_string(),
parent_thread_id: Some("thread-1".to_string()),
product_client_id: "codex-tui".to_string(),
client_name: "codex-tui".to_string(),
client_version: "1.0.0".to_string(),
model: "gpt-5".to_string(),
ephemeral: false,
subagent_source: SubAgentSource::Review,
created_at: 128,
},
)),
&mut events,
)
.await;
events.clear();
reducer
.ingest(
AnalyticsFact::Notification(Box::new(ServerNotification::ItemStarted(
ItemStartedNotification {
thread_id: "thread-subagent".to_string(),
turn_id: "turn-subagent".to_string(),
started_at_ms: 1_000,
item: sample_command_execution_item(
CommandExecutionStatus::InProgress,
/*exit_code*/ None,
/*duration_ms*/ None,
),
},
))),
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(ServerNotification::ItemCompleted(
ItemCompletedNotification {
thread_id: "thread-subagent".to_string(),
turn_id: "turn-subagent".to_string(),
completed_at_ms: 1_042,
item: sample_command_execution_item(
CommandExecutionStatus::Completed,
Some(0),
Some(42),
),
},
))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 1);
assert_eq!(payload[0]["event_type"], "codex_command_execution_event");
assert_eq!(payload[0]["event_params"]["thread_source"], "subagent");
assert_eq!(payload[0]["event_params"]["subagent_source"], "review");
assert_eq!(payload[0]["event_params"]["parent_thread_id"], "thread-1");
assert_eq!(
payload[0]["event_params"]["app_server_client"]["client_name"],
"codex-tui"
);
}
#[test]
fn plugin_used_event_serializes_expected_shape() {
let tracking = TrackEventsContext {

View File

@@ -333,10 +333,6 @@ impl AnalyticsEventsClient {
});
}
pub fn track_notification(&self, notification: ServerNotification) {
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
}
pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) {
self.record_fact(AnalyticsFact::ServerRequest {
connection_id,
@@ -349,6 +345,21 @@ impl AnalyticsEventsClient {
response: Box::new(response),
});
}
pub fn track_notification(&self, notification: ServerNotification) {
if !matches!(
notification,
ServerNotification::TurnStarted(_)
| ServerNotification::TurnCompleted(_)
| ServerNotification::ItemStarted(_)
| ServerNotification::ItemCompleted(_)
| ServerNotification::ItemGuardianApprovalReviewStarted(_)
| ServerNotification::ItemGuardianApprovalReviewCompleted(_)
) {
return;
}
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
}
}
async fn send_track_events(

View File

@@ -61,6 +61,22 @@ pub(crate) enum TrackEventRequest {
Compaction(Box<CodexCompactionEventRequest>),
TurnEvent(Box<CodexTurnEventRequest>),
TurnSteer(CodexTurnSteerEventRequest),
#[allow(dead_code)]
CommandExecution(CodexCommandExecutionEventRequest),
#[allow(dead_code)]
FileChange(CodexFileChangeEventRequest),
#[allow(dead_code)]
McpToolCall(CodexMcpToolCallEventRequest),
#[allow(dead_code)]
DynamicToolCall(CodexDynamicToolCallEventRequest),
#[allow(dead_code)]
CollabAgentToolCall(CodexCollabAgentToolCallEventRequest),
#[allow(dead_code)]
WebSearch(CodexWebSearchEventRequest),
#[allow(dead_code)]
ImageGeneration(CodexImageGenerationEventRequest),
#[allow(dead_code)]
ToolCallReview(CodexToolCallReviewEventRequest),
PluginUsed(CodexPluginUsedEventRequest),
PluginInstalled(CodexPluginEventRequest),
PluginUninstalled(CodexPluginEventRequest),
@@ -384,6 +400,282 @@ pub(crate) struct GuardianReviewEventPayload {
pub(crate) guardian_review: GuardianReviewEventParams,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ToolItemFinalApprovalOutcome {
Unknown,
NotNeeded,
ConfigAllowed,
PolicyForbidden,
GuardianApproved,
GuardianDenied,
GuardianAborted,
UserApproved,
UserApprovedForSession,
UserDenied,
UserAborted,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ToolItemTerminalStatus {
Completed,
Failed,
Rejected,
Interrupted,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ToolItemFailureKind {
ToolError,
ApprovalDenied,
ApprovalAborted,
SandboxDenied,
PolicyForbidden,
}
#[derive(Serialize)]
pub(crate) struct CodexToolItemEventBase {
pub(crate) thread_id: String,
pub(crate) turn_id: String,
/// App-server ThreadItem.id. For tool-originated items this generally
/// corresponds to the originating core call_id.
pub(crate) item_id: String,
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) thread_source: Option<&'static str>,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) tool_name: String,
pub(crate) started_at_ms: u64,
pub(crate) completed_at_ms: u64,
pub(crate) duration_ms: Option<u64>,
pub(crate) review_count: u64,
pub(crate) guardian_review_count: u64,
pub(crate) user_review_count: u64,
pub(crate) final_approval_outcome: ToolItemFinalApprovalOutcome,
pub(crate) terminal_status: ToolItemTerminalStatus,
pub(crate) failure_kind: Option<ToolItemFailureKind>,
pub(crate) requested_additional_permissions: bool,
pub(crate) requested_network_access: bool,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ToolReviewToolKind {
CommandExecution,
FileChange,
McpToolCall,
Permissions,
NetworkAccess,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ToolReviewReviewer {
Guardian,
User,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ToolReviewTrigger {
Initial,
SandboxRetry,
NetworkRetry,
SubcommandExecve,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ToolReviewStatus {
Approved,
ApprovedForSession,
ApprovedExecpolicyAmendment,
NetworkPolicyAllow,
NetworkPolicyDeny,
Denied,
Aborted,
TimedOut,
}
#[derive(Serialize)]
pub(crate) struct CodexToolCallReviewEventParams {
pub(crate) thread_id: String,
pub(crate) turn_id: String,
pub(crate) item_id: Option<String>,
pub(crate) review_id: String,
pub(crate) thread_source: Option<&'static str>,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) tool_kind: ToolReviewToolKind,
pub(crate) tool_name: String,
pub(crate) reviewer: ToolReviewReviewer,
pub(crate) trigger: ToolReviewTrigger,
pub(crate) status: ToolReviewStatus,
pub(crate) created_at: u64,
pub(crate) completed_at: Option<u64>,
pub(crate) duration_ms: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexToolCallReviewEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexToolCallReviewEventParams,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum CommandExecutionFamily {
Shell,
UserShell,
UnifiedExec,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum CommandExecutionSource {
Agent,
UserShell,
UnifiedExecStartup,
UnifiedExecInteraction,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum WebSearchActionKind {
Search,
OpenPage,
FindInPage,
Other,
}
#[derive(Serialize)]
pub(crate) struct CodexCommandExecutionEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) command_execution_source: CommandExecutionSource,
pub(crate) command_execution_family: CommandExecutionFamily,
pub(crate) exit_code: Option<i32>,
pub(crate) command_action_count: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexCommandExecutionEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexCommandExecutionEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexFileChangeEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) file_change_count: u64,
pub(crate) file_add_count: u64,
pub(crate) file_update_count: u64,
pub(crate) file_delete_count: u64,
pub(crate) file_move_count: u64,
}
#[derive(Serialize)]
pub(crate) struct CodexFileChangeEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexFileChangeEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexMcpToolCallEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) mcp_server_name: String,
pub(crate) mcp_tool_name: String,
pub(crate) mcp_error_present: bool,
}
#[derive(Serialize)]
pub(crate) struct CodexMcpToolCallEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexMcpToolCallEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexDynamicToolCallEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) dynamic_tool_name: String,
pub(crate) success: Option<bool>,
pub(crate) output_content_item_count: Option<u64>,
pub(crate) output_text_item_count: Option<u64>,
pub(crate) output_image_item_count: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexDynamicToolCallEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexDynamicToolCallEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexCollabAgentToolCallEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) sender_thread_id: String,
pub(crate) receiver_thread_count: u64,
pub(crate) receiver_thread_ids: Option<Vec<String>>,
pub(crate) requested_model: Option<String>,
pub(crate) requested_reasoning_effort: Option<String>,
pub(crate) agent_state_count: Option<u64>,
pub(crate) completed_agent_count: Option<u64>,
pub(crate) failed_agent_count: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexCollabAgentToolCallEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexCollabAgentToolCallEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexWebSearchEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) web_search_action: Option<WebSearchActionKind>,
pub(crate) query_present: bool,
pub(crate) query_count: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexWebSearchEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexWebSearchEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexImageGenerationEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) image_generation_status: String,
pub(crate) revised_prompt_present: bool,
pub(crate) saved_path_present: bool,
}
#[derive(Serialize)]
pub(crate) struct CodexImageGenerationEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexImageGenerationEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexAppMetadata {
pub(crate) connector_id: Option<String>,

View File

@@ -51,3 +51,27 @@ pub fn now_unix_seconds() -> u64 {
.unwrap_or_default()
.as_secs()
}
pub fn now_unix_millis() -> u64 {
u64::try_from(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis(),
)
.unwrap_or(u64::MAX)
}
pub(crate) fn serialize_enum_as_string<T: serde::Serialize>(value: &T) -> Option<String> {
serde_json::to_value(value)
.ok()
.and_then(|value| value.as_str().map(str::to_string))
}
pub(crate) fn usize_to_u64(value: usize) -> u64 {
u64::try_from(value).unwrap_or(u64::MAX)
}
pub(crate) fn option_i64_to_u64(value: Option<i64>) -> Option<u64> {
value.and_then(|value| u64::try_from(value).ok())
}

File diff suppressed because it is too large Load Diff

View File

@@ -12,7 +12,6 @@ use crate::thread_state::TurnSummary;
use crate::thread_state::resolve_server_request_on_thread_listener;
use crate::thread_status::ThreadWatchActiveGuard;
use crate::thread_status::ThreadWatchManager;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
use codex_app_server_protocol::AdditionalPermissionProfile as V2AdditionalPermissionProfile;
use codex_app_server_protocol::CodexErrorInfo as V2CodexErrorInfo;
@@ -141,7 +140,6 @@ pub(crate) async fn apply_bespoke_event_handling(
conversation_id: ThreadId,
conversation: Arc<CodexThread>,
thread_manager: Arc<ThreadManager>,
analytics_events_client: Option<AnalyticsEventsClient>,
outgoing: ThreadScopedOutgoingMessageSender,
thread_state: Arc<tokio::sync::Mutex<ThreadState>>,
thread_watch_manager: ThreadWatchManager,
@@ -176,10 +174,6 @@ pub(crate) async fn apply_bespoke_event_handling(
thread_id: conversation_id.to_string(),
turn,
};
if let Some(analytics_events_client) = analytics_events_client.as_ref() {
analytics_events_client
.track_notification(ServerNotification::TurnStarted(notification.clone()));
}
outgoing
.send_server_notification(ServerNotification::TurnStarted(notification))
.await;
@@ -196,7 +190,6 @@ pub(crate) async fn apply_bespoke_event_handling(
conversation_id,
event_turn_id,
turn_complete_event,
analytics_events_client.as_ref(),
&outgoing,
&thread_state,
)
@@ -238,10 +231,6 @@ pub(crate) async fn apply_bespoke_event_handling(
thread_id: Some(conversation_id.to_string()),
message: warning_event.message,
};
if let Some(analytics_events_client) = analytics_events_client.as_ref() {
analytics_events_client
.track_notification(ServerNotification::Warning(notification.clone()));
}
outgoing
.send_server_notification(ServerNotification::Warning(notification))
.await;
@@ -251,10 +240,6 @@ pub(crate) async fn apply_bespoke_event_handling(
thread_id: conversation_id.to_string(),
message: warning_event.message,
};
if let Some(analytics_events_client) = analytics_events_client.as_ref() {
analytics_events_client
.track_notification(ServerNotification::GuardianWarning(notification.clone()));
}
outgoing
.send_server_notification(ServerNotification::GuardianWarning(notification))
.await;
@@ -1139,7 +1124,6 @@ pub(crate) async fn apply_bespoke_event_handling(
conversation_id,
event_turn_id,
turn_aborted_event,
analytics_events_client.as_ref(),
&outgoing,
&thread_state,
)
@@ -1328,7 +1312,6 @@ async fn emit_turn_completed_with_status(
conversation_id: ThreadId,
event_turn_id: String,
turn_completion_metadata: TurnCompletionMetadata,
analytics_events_client: Option<&AnalyticsEventsClient>,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
let notification = TurnCompletedNotification {
@@ -1343,10 +1326,6 @@ async fn emit_turn_completed_with_status(
duration_ms: turn_completion_metadata.duration_ms,
},
};
if let Some(analytics_events_client) = analytics_events_client {
analytics_events_client
.track_notification(ServerNotification::TurnCompleted(notification.clone()));
}
outgoing
.send_server_notification(ServerNotification::TurnCompleted(notification))
.await;
@@ -1510,7 +1489,6 @@ async fn handle_turn_complete(
conversation_id: ThreadId,
event_turn_id: String,
turn_complete_event: TurnCompleteEvent,
analytics_events_client: Option<&AnalyticsEventsClient>,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
@@ -1531,7 +1509,6 @@ async fn handle_turn_complete(
completed_at: turn_complete_event.completed_at,
duration_ms: turn_complete_event.duration_ms,
},
analytics_events_client,
outgoing,
)
.await;
@@ -1541,7 +1518,6 @@ async fn handle_turn_interrupted(
conversation_id: ThreadId,
event_turn_id: String,
turn_aborted_event: TurnAbortedEvent,
analytics_events_client: Option<&AnalyticsEventsClient>,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
@@ -1557,7 +1533,6 @@ async fn handle_turn_interrupted(
completed_at: turn_aborted_event.completed_at,
duration_ms: turn_aborted_event.duration_ms,
},
analytics_events_client,
outgoing,
)
.await;
@@ -2109,7 +2084,6 @@ mod tests {
use codex_app_server_protocol::GuardianApprovalReviewStatus;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::TurnPlanStepStatus;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_protocol::items::HookPromptFragment;
use codex_protocol::items::build_hook_prompt_message;
@@ -2240,7 +2214,6 @@ mod tests {
outgoing: ThreadScopedOutgoingMessageSender,
thread_state: Arc<Mutex<ThreadState>>,
thread_watch_manager: ThreadWatchManager,
analytics_events_client: AnalyticsEventsClient,
codex_home: PathBuf,
}
@@ -2255,7 +2228,6 @@ mod tests {
self.conversation_id,
self.conversation.clone(),
self.thread_manager.clone(),
Some(self.analytics_events_client.clone()),
self.outgoing.clone(),
self.thread_state.clone(),
self.thread_watch_manager.clone(),
@@ -2589,13 +2561,6 @@ mod tests {
outgoing: outgoing.clone(),
thread_state: thread_state.clone(),
thread_watch_manager: thread_watch_manager.clone(),
analytics_events_client: AnalyticsEventsClient::new(
AuthManager::from_auth_for_testing(
CodexAuth::create_dummy_chatgpt_auth_for_testing(),
),
"http://localhost".to_string(),
Some(false),
),
codex_home: codex_home.path().to_path_buf(),
};
@@ -3175,7 +3140,6 @@ mod tests {
conversation_id,
event_turn_id.clone(),
turn_complete_event(&event_turn_id),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -3227,7 +3191,6 @@ mod tests {
conversation_id,
event_turn_id.clone(),
turn_aborted_event(&event_turn_id),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -3278,7 +3241,6 @@ mod tests {
conversation_id,
event_turn_id.clone(),
turn_complete_event(&event_turn_id),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -3513,7 +3475,6 @@ mod tests {
conversation_a,
a_turn1.clone(),
turn_complete_event(&a_turn1),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -3535,7 +3496,6 @@ mod tests {
conversation_b,
b_turn1.clone(),
turn_complete_event(&b_turn1),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -3547,7 +3507,6 @@ mod tests {
conversation_a,
a_turn2.clone(),
turn_complete_event(&a_turn2),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)

View File

@@ -141,6 +141,9 @@ impl ThreadScopedOutgoingMessageSender {
}
pub(crate) async fn send_server_notification(&self, notification: ServerNotification) {
self.outgoing
.analytics_events_client
.track_notification(notification.clone());
if self.connection_ids.is_empty() {
return;
}
@@ -524,7 +527,7 @@ impl OutgoingMessageSender {
targeted_connections = connection_ids.len(),
"app-server event: {notification}"
);
let outgoing_message = OutgoingMessage::AppServerNotification(notification);
let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone());
if connection_ids.is_empty() {
if let Err(err) = self
.sender
@@ -558,7 +561,7 @@ impl OutgoingMessageSender {
notification: ServerNotification,
) {
tracing::trace!("app-server event: {notification}");
let outgoing_message = OutgoingMessage::AppServerNotification(notification);
let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone());
let (write_complete_tx, write_complete_rx) = oneshot::channel();
if let Err(err) = self
.sender

View File

@@ -249,7 +249,7 @@ pub(super) async fn ensure_listener_task_running(
thread_manager,
thread_state_manager,
pending_thread_unloads,
analytics_events_client,
analytics_events_client: _,
thread_watch_manager,
thread_list_state_permit,
fallback_model_provider,
@@ -325,7 +325,6 @@ pub(super) async fn ensure_listener_task_running(
conversation_id,
conversation.clone(),
thread_manager.clone(),
Some(analytics_events_client.clone()),
thread_outgoing,
thread_state.clone(),
thread_watch_manager.clone(),