mirror of
https://github.com/openai/codex.git
synced 2026-04-07 06:14:48 +00:00
Compare commits
2 Commits
bot/update
...
pr16640
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6753245d6 | ||
|
|
a860b59e5d |
@@ -6,6 +6,7 @@ use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexPluginEventRequest;
|
||||
use crate::events::CodexPluginUsedEventRequest;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
use crate::events::CodexTurnEventRequest;
|
||||
use crate::events::ThreadInitializationMode;
|
||||
use crate::events::ThreadInitializedEvent;
|
||||
use crate::events::ThreadInitializedEventParams;
|
||||
@@ -27,28 +28,46 @@ use crate::facts::SkillInvocation;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
use crate::facts::TrackEventsContext;
|
||||
use crate::facts::TurnResolvedConfigFact;
|
||||
use crate::facts::TurnStatus;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use crate::reducer::normalize_path_for_skill_id;
|
||||
use crate::reducer::skill_id_for_local_skill;
|
||||
use codex_app_server_protocol::ApprovalsReviewer as AppServerApprovalsReviewer;
|
||||
use codex_app_server_protocol::AskForApproval as AppServerAskForApproval;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::InitializeCapabilities;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
|
||||
use codex_app_server_protocol::ThreadTokenUsage;
|
||||
use codex_app_server_protocol::ThreadTokenUsageUpdatedNotification;
|
||||
use codex_app_server_protocol::TokenUsageBreakdown;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnCompletedNotification;
|
||||
use codex_app_server_protocol::TurnError as AppServerTurnError;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartedNotification;
|
||||
use codex_app_server_protocol::TurnStatus as AppServerTurnStatus;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_login::default_client::DEFAULT_ORIGINATOR;
|
||||
use codex_login::default_client::originator;
|
||||
use codex_plugin::AppConnectorId;
|
||||
use codex_plugin::PluginCapabilitySummary;
|
||||
use codex_plugin::PluginId;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
use codex_protocol::config_types::ApprovalsReviewer;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
@@ -114,6 +133,218 @@ fn sample_thread_resume_response(thread_id: &str, ephemeral: bool, model: &str)
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_turn_start_request(thread_id: &str, request_id: i64) -> ClientRequest {
|
||||
ClientRequest::TurnStart {
|
||||
request_id: RequestId::Integer(request_id),
|
||||
params: TurnStartParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
input: vec![
|
||||
UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: vec![],
|
||||
},
|
||||
UserInput::Image {
|
||||
url: "https://example.com/a.png".to_string(),
|
||||
},
|
||||
],
|
||||
..Default::default()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_turn_start_response(turn_id: &str, request_id: i64) -> ClientResponse {
|
||||
ClientResponse::TurnStart {
|
||||
request_id: RequestId::Integer(request_id),
|
||||
response: codex_app_server_protocol::TurnStartResponse {
|
||||
turn: Turn {
|
||||
id: turn_id.to_string(),
|
||||
items: vec![],
|
||||
status: AppServerTurnStatus::InProgress,
|
||||
error: None,
|
||||
started_at: None,
|
||||
completed_at: None,
|
||||
duration_ms: None,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_turn_started_notification(thread_id: &str, turn_id: &str) -> ServerNotification {
|
||||
ServerNotification::TurnStarted(TurnStartedNotification {
|
||||
thread_id: thread_id.to_string(),
|
||||
turn: Turn {
|
||||
id: turn_id.to_string(),
|
||||
items: vec![],
|
||||
status: AppServerTurnStatus::InProgress,
|
||||
error: None,
|
||||
started_at: Some(455),
|
||||
completed_at: None,
|
||||
duration_ms: None,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn sample_thread_token_usage_updated_notification(
|
||||
thread_id: &str,
|
||||
turn_id: &str,
|
||||
) -> ServerNotification {
|
||||
ServerNotification::ThreadTokenUsageUpdated(ThreadTokenUsageUpdatedNotification {
|
||||
thread_id: thread_id.to_string(),
|
||||
turn_id: turn_id.to_string(),
|
||||
token_usage: ThreadTokenUsage {
|
||||
total: TokenUsageBreakdown {
|
||||
total_tokens: 500,
|
||||
input_tokens: 200,
|
||||
cached_input_tokens: 50,
|
||||
output_tokens: 220,
|
||||
reasoning_output_tokens: 30,
|
||||
},
|
||||
last: TokenUsageBreakdown {
|
||||
total_tokens: 321,
|
||||
input_tokens: 123,
|
||||
cached_input_tokens: 45,
|
||||
output_tokens: 140,
|
||||
reasoning_output_tokens: 13,
|
||||
},
|
||||
model_context_window: Some(200_000),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn sample_turn_completed_notification(
|
||||
thread_id: &str,
|
||||
turn_id: &str,
|
||||
status: AppServerTurnStatus,
|
||||
codex_error_info: Option<codex_app_server_protocol::CodexErrorInfo>,
|
||||
) -> ServerNotification {
|
||||
ServerNotification::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: thread_id.to_string(),
|
||||
turn: Turn {
|
||||
id: turn_id.to_string(),
|
||||
items: vec![],
|
||||
status,
|
||||
error: codex_error_info.map(|codex_error_info| AppServerTurnError {
|
||||
message: "turn failed".to_string(),
|
||||
codex_error_info: Some(codex_error_info),
|
||||
additional_details: None,
|
||||
}),
|
||||
started_at: None,
|
||||
completed_at: Some(456),
|
||||
duration_ms: Some(1234),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn sample_turn_resolved_config(turn_id: &str) -> TurnResolvedConfigFact {
|
||||
TurnResolvedConfigFact {
|
||||
turn_id: turn_id.to_string(),
|
||||
thread_id: "thread-2".to_string(),
|
||||
num_input_images: 1,
|
||||
submission_type: None,
|
||||
model: "gpt-5".to_string(),
|
||||
model_provider: "openai".to_string(),
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
reasoning_effort: None,
|
||||
reasoning_summary: None,
|
||||
service_tier: None,
|
||||
approval_policy: AskForApproval::OnRequest,
|
||||
approvals_reviewer: ApprovalsReviewer::GuardianSubagent,
|
||||
sandbox_network_access: true,
|
||||
collaboration_mode: ModeKind::Plan,
|
||||
personality: None,
|
||||
is_first_turn: true,
|
||||
}
|
||||
}
|
||||
|
||||
async fn ingest_turn_prerequisites(
|
||||
reducer: &mut AnalyticsReducer,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
include_initialize: bool,
|
||||
include_resolved_config: bool,
|
||||
include_started: bool,
|
||||
include_token_usage: bool,
|
||||
) {
|
||||
if include_initialize {
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Initialize {
|
||||
connection_id: 7,
|
||||
params: InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "codex-tui".to_string(),
|
||||
title: None,
|
||||
version: "1.0.0".to_string(),
|
||||
},
|
||||
capabilities: None,
|
||||
},
|
||||
product_client_id: "codex-tui".to_string(),
|
||||
runtime: CodexRuntimeMetadata {
|
||||
codex_rs_version: "0.1.0".to_string(),
|
||||
runtime_os: "macos".to_string(),
|
||||
runtime_os_version: "15.3.1".to_string(),
|
||||
runtime_arch: "aarch64".to_string(),
|
||||
},
|
||||
rpc_transport: AppServerRpcTransport::Stdio,
|
||||
},
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(3),
|
||||
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
|
||||
},
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
|
||||
},
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
|
||||
if include_resolved_config {
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::TurnResolvedConfig(Box::new(
|
||||
sample_turn_resolved_config("turn-2"),
|
||||
))),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
if include_started {
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(sample_turn_started_notification(
|
||||
"thread-2", "turn-2",
|
||||
))),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
if include_token_usage {
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(
|
||||
sample_thread_token_usage_updated_notification("thread-2", "turn-2"),
|
||||
)),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
fn expected_absolute_path(path: &PathBuf) -> String {
|
||||
std::fs::canonicalize(path)
|
||||
.unwrap_or_else(|_| path.to_path_buf())
|
||||
@@ -823,6 +1054,245 @@ async fn reducer_ingests_plugin_state_changed_fact() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn turn_event_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::TurnEvent(Box::new(CodexTurnEventRequest {
|
||||
event_type: "codex_turn_event",
|
||||
event_params: crate::events::CodexTurnEventParams {
|
||||
thread_id: "thread-2".to_string(),
|
||||
turn_id: "turn-2".to_string(),
|
||||
product_client_id: "codex-tui".to_string(),
|
||||
submission_type: None,
|
||||
model: Some("gpt-5".to_string()),
|
||||
model_provider: "openai".to_string(),
|
||||
sandbox_policy: Some("read_only"),
|
||||
reasoning_effort: Some("high".to_string()),
|
||||
reasoning_summary: Some("detailed".to_string()),
|
||||
service_tier: "flex".to_string(),
|
||||
approval_policy: "on-request".to_string(),
|
||||
approvals_reviewer: "guardian_subagent".to_string(),
|
||||
sandbox_network_access: true,
|
||||
collaboration_mode: Some("plan"),
|
||||
personality: Some("pragmatic".to_string()),
|
||||
num_input_images: 2,
|
||||
is_first_turn: true,
|
||||
status: Some(TurnStatus::Completed),
|
||||
turn_error: None,
|
||||
steer_count: None,
|
||||
total_tool_call_count: None,
|
||||
shell_command_count: None,
|
||||
file_change_count: None,
|
||||
mcp_tool_call_count: None,
|
||||
dynamic_tool_call_count: None,
|
||||
subagent_tool_call_count: None,
|
||||
web_search_count: None,
|
||||
image_generation_count: None,
|
||||
input_tokens: None,
|
||||
cached_input_tokens: None,
|
||||
output_tokens: None,
|
||||
reasoning_output_tokens: None,
|
||||
total_tokens: None,
|
||||
duration_ms: Some(1234),
|
||||
started_at: Some(455),
|
||||
completed_at: Some(456),
|
||||
},
|
||||
}));
|
||||
|
||||
let payload = serde_json::to_value(&event).expect("serialize turn event");
|
||||
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!({
|
||||
"event_type": "codex_turn_event",
|
||||
"event_params": {
|
||||
"thread_id": "thread-2",
|
||||
"turn_id": "turn-2",
|
||||
"product_client_id": "codex-tui",
|
||||
"submission_type": null,
|
||||
"model": "gpt-5",
|
||||
"model_provider": "openai",
|
||||
"sandbox_policy": "read_only",
|
||||
"reasoning_effort": "high",
|
||||
"reasoning_summary": "detailed",
|
||||
"service_tier": "flex",
|
||||
"approval_policy": "on-request",
|
||||
"approvals_reviewer": "guardian_subagent",
|
||||
"sandbox_network_access": true,
|
||||
"collaboration_mode": "plan",
|
||||
"personality": "pragmatic",
|
||||
"num_input_images": 2,
|
||||
"is_first_turn": true,
|
||||
"status": "completed",
|
||||
"turn_error": null,
|
||||
"steer_count": null,
|
||||
"total_tool_call_count": null,
|
||||
"shell_command_count": null,
|
||||
"file_change_count": null,
|
||||
"mcp_tool_call_count": null,
|
||||
"dynamic_tool_call_count": null,
|
||||
"subagent_tool_call_count": null,
|
||||
"web_search_count": null,
|
||||
"image_generation_count": null,
|
||||
"input_tokens": null,
|
||||
"cached_input_tokens": null,
|
||||
"output_tokens": null,
|
||||
"reasoning_output_tokens": null,
|
||||
"total_tokens": null,
|
||||
"duration_ms": 1234,
|
||||
"started_at": 455,
|
||||
"completed_at": 456
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_lifecycle_emits_turn_event() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut out = Vec::new();
|
||||
|
||||
ingest_turn_prerequisites(
|
||||
&mut reducer,
|
||||
&mut out,
|
||||
/*include_initialize*/ true,
|
||||
/*include_resolved_config*/ true,
|
||||
/*include_started*/ true,
|
||||
/*include_token_usage*/ true,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
|
||||
"thread-2",
|
||||
"turn-2",
|
||||
AppServerTurnStatus::Completed,
|
||||
/*codex_error_info*/ None,
|
||||
))),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(out.len(), 1);
|
||||
let payload = serde_json::to_value(&out[0]).expect("serialize turn event");
|
||||
assert_eq!(payload["event_type"], json!("codex_turn_event"));
|
||||
assert_eq!(payload["event_params"]["thread_id"], json!("thread-2"));
|
||||
assert_eq!(payload["event_params"]["turn_id"], json!("turn-2"));
|
||||
assert_eq!(
|
||||
payload["event_params"]["product_client_id"],
|
||||
json!("codex-tui")
|
||||
);
|
||||
assert_eq!(payload["event_params"]["num_input_images"], json!(1));
|
||||
assert_eq!(payload["event_params"]["status"], json!("completed"));
|
||||
assert_eq!(payload["event_params"]["started_at"], json!(455));
|
||||
assert_eq!(payload["event_params"]["completed_at"], json!(456));
|
||||
assert_eq!(payload["event_params"]["duration_ms"], json!(1234));
|
||||
assert_eq!(payload["event_params"]["input_tokens"], json!(123));
|
||||
assert_eq!(payload["event_params"]["cached_input_tokens"], json!(45));
|
||||
assert_eq!(payload["event_params"]["output_tokens"], json!(140));
|
||||
assert_eq!(
|
||||
payload["event_params"]["reasoning_output_tokens"],
|
||||
json!(13)
|
||||
);
|
||||
assert_eq!(payload["event_params"]["total_tokens"], json!(321));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_does_not_emit_without_required_prerequisites() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut out = Vec::new();
|
||||
|
||||
ingest_turn_prerequisites(
|
||||
&mut reducer,
|
||||
&mut out,
|
||||
/*include_initialize*/ false,
|
||||
/*include_resolved_config*/ true,
|
||||
/*include_started*/ false,
|
||||
/*include_token_usage*/ false,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
|
||||
"thread-2",
|
||||
"turn-2",
|
||||
AppServerTurnStatus::Completed,
|
||||
/*codex_error_info*/ None,
|
||||
))),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(out.len(), 1);
|
||||
let payload = serde_json::to_value(&out[0]).expect("serialize turn event");
|
||||
assert_eq!(
|
||||
payload["event_params"]["product_client_id"],
|
||||
json!(originator().value)
|
||||
);
|
||||
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut out = Vec::new();
|
||||
|
||||
ingest_turn_prerequisites(
|
||||
&mut reducer,
|
||||
&mut out,
|
||||
/*include_initialize*/ true,
|
||||
/*include_resolved_config*/ false,
|
||||
/*include_started*/ false,
|
||||
/*include_token_usage*/ false,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
|
||||
"thread-2",
|
||||
"turn-2",
|
||||
AppServerTurnStatus::Completed,
|
||||
/*codex_error_info*/ None,
|
||||
))),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
assert!(out.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_completed_without_started_notification_emits_null_started_at() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut out = Vec::new();
|
||||
|
||||
ingest_turn_prerequisites(
|
||||
&mut reducer,
|
||||
&mut out,
|
||||
/*include_initialize*/ true,
|
||||
/*include_resolved_config*/ true,
|
||||
/*include_started*/ false,
|
||||
/*include_token_usage*/ false,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
|
||||
"thread-2",
|
||||
"turn-2",
|
||||
AppServerTurnStatus::Completed,
|
||||
/*codex_error_info*/ None,
|
||||
))),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&out[0]).expect("serialize turn event");
|
||||
assert_eq!(payload["event_params"]["started_at"], json!(null));
|
||||
assert_eq!(payload["event_params"]["duration_ms"], json!(1234));
|
||||
assert_eq!(payload["event_params"]["input_tokens"], json!(null));
|
||||
assert_eq!(payload["event_params"]["cached_input_tokens"], json!(null));
|
||||
assert_eq!(payload["event_params"]["output_tokens"], json!(null));
|
||||
assert_eq!(
|
||||
payload["event_params"]["reasoning_output_tokens"],
|
||||
json!(null)
|
||||
);
|
||||
assert_eq!(payload["event_params"]["total_tokens"], json!(null));
|
||||
}
|
||||
|
||||
fn sample_plugin_metadata() -> PluginTelemetryMetadata {
|
||||
PluginTelemetryMetadata {
|
||||
plugin_id: PluginId::parse("sample@test").expect("valid plugin id"),
|
||||
|
||||
@@ -13,9 +13,13 @@ use crate::facts::SkillInvocation;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
use crate::facts::TrackEventsContext;
|
||||
use crate::facts::TurnResolvedConfigFact;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::default_client::create_client;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
@@ -160,6 +164,14 @@ impl AnalyticsEventsClient {
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_request(&self, connection_id: u64, request_id: RequestId, request: ClientRequest) {
|
||||
self.record_fact(AnalyticsFact::Request {
|
||||
connection_id,
|
||||
request_id,
|
||||
request: Box::new(request),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
|
||||
if !self.queue.should_enqueue_app_used(&tracking, &app) {
|
||||
return;
|
||||
@@ -178,6 +190,12 @@ impl AnalyticsEventsClient {
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_turn_resolved_config(&self, fact: TurnResolvedConfigFact) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::TurnResolvedConfig(Box::new(fact)),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
@@ -227,6 +245,10 @@ impl AnalyticsEventsClient {
|
||||
response: Box::new(response),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_notification(&self, notification: ServerNotification) {
|
||||
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_track_events(
|
||||
|
||||
@@ -3,6 +3,9 @@ use crate::facts::InvocationType;
|
||||
use crate::facts::PluginState;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
use crate::facts::TrackEventsContext;
|
||||
use crate::facts::TurnStatus;
|
||||
use crate::facts::TurnSubmissionType;
|
||||
use codex_app_server_protocol::CodexErrorInfo;
|
||||
use codex_login::default_client::originator;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
@@ -37,6 +40,7 @@ pub(crate) enum TrackEventRequest {
|
||||
ThreadInitialized(ThreadInitializedEvent),
|
||||
AppMentioned(CodexAppMentionedEventRequest),
|
||||
AppUsed(CodexAppUsedEventRequest),
|
||||
TurnEvent(Box<CodexTurnEventRequest>),
|
||||
PluginUsed(CodexPluginUsedEventRequest),
|
||||
PluginInstalled(CodexPluginEventRequest),
|
||||
PluginUninstalled(CodexPluginEventRequest),
|
||||
@@ -122,6 +126,52 @@ pub(crate) struct CodexAppUsedEventRequest {
|
||||
pub(crate) event_params: CodexAppMetadata,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexTurnEventParams {
|
||||
pub(crate) thread_id: String,
|
||||
pub(crate) turn_id: String,
|
||||
pub(crate) product_client_id: String,
|
||||
pub(crate) submission_type: Option<TurnSubmissionType>,
|
||||
pub(crate) model: Option<String>,
|
||||
pub(crate) model_provider: String,
|
||||
pub(crate) sandbox_policy: Option<&'static str>,
|
||||
pub(crate) reasoning_effort: Option<String>,
|
||||
pub(crate) reasoning_summary: Option<String>,
|
||||
pub(crate) service_tier: String,
|
||||
pub(crate) approval_policy: String,
|
||||
pub(crate) approvals_reviewer: String,
|
||||
pub(crate) sandbox_network_access: bool,
|
||||
pub(crate) collaboration_mode: Option<&'static str>,
|
||||
pub(crate) personality: Option<String>,
|
||||
pub(crate) num_input_images: usize,
|
||||
pub(crate) is_first_turn: bool,
|
||||
pub(crate) status: Option<TurnStatus>,
|
||||
pub(crate) turn_error: Option<CodexErrorInfo>,
|
||||
pub(crate) steer_count: Option<usize>,
|
||||
pub(crate) total_tool_call_count: Option<usize>,
|
||||
pub(crate) shell_command_count: Option<usize>,
|
||||
pub(crate) file_change_count: Option<usize>,
|
||||
pub(crate) mcp_tool_call_count: Option<usize>,
|
||||
pub(crate) dynamic_tool_call_count: Option<usize>,
|
||||
pub(crate) subagent_tool_call_count: Option<usize>,
|
||||
pub(crate) web_search_count: Option<usize>,
|
||||
pub(crate) image_generation_count: Option<usize>,
|
||||
pub(crate) input_tokens: Option<i64>,
|
||||
pub(crate) cached_input_tokens: Option<i64>,
|
||||
pub(crate) output_tokens: Option<i64>,
|
||||
pub(crate) reasoning_output_tokens: Option<i64>,
|
||||
pub(crate) total_tokens: Option<i64>,
|
||||
pub(crate) duration_ms: Option<u64>,
|
||||
pub(crate) started_at: Option<u64>,
|
||||
pub(crate) completed_at: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexTurnEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexTurnEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexPluginMetadata {
|
||||
pub(crate) plugin_id: Option<String>,
|
||||
|
||||
@@ -6,6 +6,14 @@ use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
use codex_protocol::config_types::ApprovalsReviewer;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::config_types::Personality;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::config_types::ServiceTier;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::SkillScope;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use serde::Serialize;
|
||||
@@ -30,6 +38,41 @@ pub fn build_track_events_context(
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TurnResolvedConfigFact {
|
||||
pub turn_id: String,
|
||||
pub thread_id: String,
|
||||
pub num_input_images: usize,
|
||||
pub submission_type: Option<TurnSubmissionType>,
|
||||
pub model: String,
|
||||
pub model_provider: String,
|
||||
pub sandbox_policy: SandboxPolicy,
|
||||
pub reasoning_effort: Option<ReasoningEffort>,
|
||||
pub reasoning_summary: Option<ReasoningSummary>,
|
||||
pub service_tier: Option<ServiceTier>,
|
||||
pub approval_policy: AskForApproval,
|
||||
pub approvals_reviewer: ApprovalsReviewer,
|
||||
pub sandbox_network_access: bool,
|
||||
pub collaboration_mode: ModeKind,
|
||||
pub personality: Option<Personality>,
|
||||
pub is_first_turn: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TurnSubmissionType {
|
||||
Default,
|
||||
Queued,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TurnStatus {
|
||||
Completed,
|
||||
Failed,
|
||||
Interrupted,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SkillInvocation {
|
||||
pub skill_name: String,
|
||||
@@ -89,6 +132,7 @@ pub(crate) enum AnalyticsFact {
|
||||
|
||||
pub(crate) enum CustomAnalyticsFact {
|
||||
SubAgentThreadStarted(SubAgentThreadStartedInput),
|
||||
TurnResolvedConfig(Box<TurnResolvedConfigFact>),
|
||||
SkillInvoked(SkillInvokedInput),
|
||||
AppMentioned(AppMentionedInput),
|
||||
AppUsed(AppUsedInput),
|
||||
|
||||
@@ -10,6 +10,9 @@ pub use facts::InvocationType;
|
||||
pub use facts::SkillInvocation;
|
||||
pub use facts::SubAgentThreadStartedInput;
|
||||
pub use facts::TrackEventsContext;
|
||||
pub use facts::TurnResolvedConfigFact;
|
||||
pub use facts::TurnStatus;
|
||||
pub use facts::TurnSubmissionType;
|
||||
pub use facts::build_track_events_context;
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -5,6 +5,8 @@ use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexPluginEventRequest;
|
||||
use crate::events::CodexPluginUsedEventRequest;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
use crate::events::CodexTurnEventParams;
|
||||
use crate::events::CodexTurnEventRequest;
|
||||
use crate::events::SkillInvocationEventParams;
|
||||
use crate::events::SkillInvocationEventRequest;
|
||||
use crate::events::ThreadInitializationMode;
|
||||
@@ -26,11 +28,23 @@ use crate::facts::PluginStateChangedInput;
|
||||
use crate::facts::PluginUsedInput;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
use crate::facts::TurnResolvedConfigFact;
|
||||
use crate::facts::TurnStatus;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::CodexErrorInfo;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::TokenUsageBreakdown;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_git_utils::collect_git_info;
|
||||
use codex_git_utils::get_git_repo_root;
|
||||
use codex_login::default_client::originator;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::config_types::Personality;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SkillScope;
|
||||
use sha1::Digest;
|
||||
@@ -39,6 +53,8 @@ use std::path::Path;
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct AnalyticsReducer {
|
||||
requests: HashMap<(u64, RequestId), RequestState>,
|
||||
turns: HashMap<String, TurnState>,
|
||||
connections: HashMap<u64, ConnectionState>,
|
||||
}
|
||||
|
||||
@@ -47,6 +63,33 @@ struct ConnectionState {
|
||||
runtime: CodexRuntimeMetadata,
|
||||
}
|
||||
|
||||
enum RequestState {
|
||||
TurnStart(PendingTurnStartState),
|
||||
}
|
||||
|
||||
struct PendingTurnStartState {
|
||||
thread_id: String,
|
||||
num_input_images: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CompletedTurnState {
|
||||
status: Option<TurnStatus>,
|
||||
turn_error: Option<CodexErrorInfo>,
|
||||
completed_at: u64,
|
||||
duration_ms: Option<u64>,
|
||||
}
|
||||
|
||||
struct TurnState {
|
||||
connection_id: Option<u64>,
|
||||
thread_id: Option<String>,
|
||||
num_input_images: Option<usize>,
|
||||
resolved_config: Option<TurnResolvedConfigFact>,
|
||||
started_at: Option<u64>,
|
||||
token_usage: Option<TokenUsageBreakdown>,
|
||||
completed: Option<CompletedTurnState>,
|
||||
}
|
||||
|
||||
impl AnalyticsReducer {
|
||||
pub(crate) async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec<TrackEventRequest>) {
|
||||
match input {
|
||||
@@ -66,21 +109,28 @@ impl AnalyticsReducer {
|
||||
);
|
||||
}
|
||||
AnalyticsFact::Request {
|
||||
connection_id: _connection_id,
|
||||
request_id: _request_id,
|
||||
request: _request,
|
||||
} => {}
|
||||
connection_id,
|
||||
request_id,
|
||||
request,
|
||||
} => {
|
||||
self.ingest_request(connection_id, request_id, *request);
|
||||
}
|
||||
AnalyticsFact::Response {
|
||||
connection_id,
|
||||
response,
|
||||
} => {
|
||||
self.ingest_response(connection_id, *response, out);
|
||||
}
|
||||
AnalyticsFact::Notification(_notification) => {}
|
||||
AnalyticsFact::Notification(notification) => {
|
||||
self.ingest_notification(*notification, out);
|
||||
}
|
||||
AnalyticsFact::Custom(input) => match input {
|
||||
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
|
||||
self.ingest_subagent_thread_started(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::TurnResolvedConfig(input) => {
|
||||
self.ingest_turn_resolved_config(*input, out);
|
||||
}
|
||||
CustomAnalyticsFact::SkillInvoked(input) => {
|
||||
self.ingest_skill_invoked(input, out).await;
|
||||
}
|
||||
@@ -135,6 +185,53 @@ impl AnalyticsReducer {
|
||||
));
|
||||
}
|
||||
|
||||
fn ingest_request(
|
||||
&mut self,
|
||||
connection_id: u64,
|
||||
request_id: RequestId,
|
||||
request: ClientRequest,
|
||||
) {
|
||||
let ClientRequest::TurnStart { params, .. } = request else {
|
||||
return;
|
||||
};
|
||||
self.requests.insert(
|
||||
(connection_id, request_id),
|
||||
RequestState::TurnStart(PendingTurnStartState {
|
||||
thread_id: params.thread_id,
|
||||
num_input_images: params
|
||||
.input
|
||||
.iter()
|
||||
.filter(|item| {
|
||||
matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. })
|
||||
})
|
||||
.count(),
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
fn ingest_turn_resolved_config(
|
||||
&mut self,
|
||||
input: TurnResolvedConfigFact,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let turn_id = input.turn_id.clone();
|
||||
let thread_id = input.thread_id.clone();
|
||||
let num_input_images = input.num_input_images;
|
||||
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
started_at: None,
|
||||
token_usage: None,
|
||||
completed: None,
|
||||
});
|
||||
turn_state.thread_id = Some(thread_id);
|
||||
turn_state.num_input_images = Some(num_input_images);
|
||||
turn_state.resolved_config = Some(input);
|
||||
self.maybe_emit_turn_event(&turn_id, out);
|
||||
}
|
||||
|
||||
async fn ingest_skill_invoked(
|
||||
&mut self,
|
||||
input: SkillInvokedInput,
|
||||
@@ -235,24 +332,139 @@ impl AnalyticsReducer {
|
||||
response: ClientResponse,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let (thread, model, initialization_mode) = match response {
|
||||
ClientResponse::ThreadStart { response, .. } => (
|
||||
response.thread,
|
||||
response.model,
|
||||
ThreadInitializationMode::New,
|
||||
),
|
||||
ClientResponse::ThreadResume { response, .. } => (
|
||||
response.thread,
|
||||
response.model,
|
||||
ThreadInitializationMode::Resumed,
|
||||
),
|
||||
ClientResponse::ThreadFork { response, .. } => (
|
||||
response.thread,
|
||||
response.model,
|
||||
ThreadInitializationMode::Forked,
|
||||
),
|
||||
_ => return,
|
||||
};
|
||||
match response {
|
||||
ClientResponse::ThreadStart { response, .. } => {
|
||||
self.emit_thread_initialized(
|
||||
connection_id,
|
||||
response.thread,
|
||||
response.model,
|
||||
ThreadInitializationMode::New,
|
||||
out,
|
||||
);
|
||||
}
|
||||
ClientResponse::ThreadResume { response, .. } => {
|
||||
self.emit_thread_initialized(
|
||||
connection_id,
|
||||
response.thread,
|
||||
response.model,
|
||||
ThreadInitializationMode::Resumed,
|
||||
out,
|
||||
);
|
||||
}
|
||||
ClientResponse::ThreadFork { response, .. } => {
|
||||
self.emit_thread_initialized(
|
||||
connection_id,
|
||||
response.thread,
|
||||
response.model,
|
||||
ThreadInitializationMode::Forked,
|
||||
out,
|
||||
);
|
||||
}
|
||||
ClientResponse::TurnStart {
|
||||
request_id,
|
||||
response,
|
||||
} => {
|
||||
let turn_id = response.turn.id;
|
||||
let Some(RequestState::TurnStart(pending_request)) =
|
||||
self.requests.remove(&(connection_id, request_id))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
started_at: None,
|
||||
token_usage: None,
|
||||
completed: None,
|
||||
});
|
||||
turn_state.connection_id = Some(connection_id);
|
||||
turn_state.thread_id = Some(pending_request.thread_id);
|
||||
turn_state.num_input_images = Some(pending_request.num_input_images);
|
||||
self.maybe_emit_turn_event(&turn_id, out);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest_notification(
|
||||
&mut self,
|
||||
notification: ServerNotification,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
match notification {
|
||||
ServerNotification::TurnStarted(notification) => {
|
||||
let turn_state = self.turns.entry(notification.turn.id).or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
started_at: None,
|
||||
token_usage: None,
|
||||
completed: None,
|
||||
});
|
||||
turn_state.started_at = notification
|
||||
.turn
|
||||
.started_at
|
||||
.and_then(|started_at| u64::try_from(started_at).ok());
|
||||
}
|
||||
ServerNotification::ThreadTokenUsageUpdated(notification) => {
|
||||
let turn_state = self.turns.entry(notification.turn_id).or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
started_at: None,
|
||||
token_usage: None,
|
||||
completed: None,
|
||||
});
|
||||
turn_state.token_usage = Some(notification.token_usage.last);
|
||||
}
|
||||
ServerNotification::TurnCompleted(notification) => {
|
||||
let turn_state =
|
||||
self.turns
|
||||
.entry(notification.turn.id.clone())
|
||||
.or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
started_at: None,
|
||||
token_usage: None,
|
||||
completed: None,
|
||||
});
|
||||
turn_state.completed = Some(CompletedTurnState {
|
||||
status: analytics_turn_status(notification.turn.status),
|
||||
turn_error: notification
|
||||
.turn
|
||||
.error
|
||||
.and_then(|error| error.codex_error_info),
|
||||
completed_at: notification
|
||||
.turn
|
||||
.completed_at
|
||||
.and_then(|completed_at| u64::try_from(completed_at).ok())
|
||||
.unwrap_or_default(),
|
||||
duration_ms: notification
|
||||
.turn
|
||||
.duration_ms
|
||||
.and_then(|duration_ms| u64::try_from(duration_ms).ok()),
|
||||
});
|
||||
let turn_id = notification.turn.id;
|
||||
self.maybe_emit_turn_event(&turn_id, out);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_thread_initialized(
|
||||
&mut self,
|
||||
connection_id: u64,
|
||||
thread: codex_app_server_protocol::Thread,
|
||||
model: String,
|
||||
initialization_mode: ThreadInitializationMode,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let thread_source: SessionSource = thread.source.into();
|
||||
let Some(connection_state) = self.connections.get(&connection_id) else {
|
||||
return;
|
||||
@@ -275,6 +487,159 @@ impl AnalyticsReducer {
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
fn maybe_emit_turn_event(&mut self, turn_id: &str, out: &mut Vec<TrackEventRequest>) {
|
||||
let Some(turn_state) = self.turns.get(turn_id) else {
|
||||
return;
|
||||
};
|
||||
if turn_state.thread_id.is_none()
|
||||
|| turn_state.num_input_images.is_none()
|
||||
|| turn_state.resolved_config.is_none()
|
||||
|| turn_state.completed.is_none()
|
||||
{
|
||||
return;
|
||||
}
|
||||
let product_client_id = turn_state
|
||||
.connection_id
|
||||
.and_then(|connection_id| self.connections.get(&connection_id))
|
||||
.map(|connection_state| connection_state.app_server_client.product_client_id.clone())
|
||||
.unwrap_or_else(|| originator().value);
|
||||
out.push(TrackEventRequest::TurnEvent(Box::new(
|
||||
CodexTurnEventRequest {
|
||||
event_type: "codex_turn_event",
|
||||
event_params: codex_turn_event_params(
|
||||
product_client_id,
|
||||
turn_id.to_string(),
|
||||
turn_state,
|
||||
),
|
||||
},
|
||||
)));
|
||||
self.turns.remove(turn_id);
|
||||
}
|
||||
}
|
||||
|
||||
fn codex_turn_event_params(
|
||||
product_client_id: String,
|
||||
turn_id: String,
|
||||
turn_state: &TurnState,
|
||||
) -> CodexTurnEventParams {
|
||||
let (Some(thread_id), Some(num_input_images), Some(resolved_config), Some(completed)) = (
|
||||
turn_state.thread_id.clone(),
|
||||
turn_state.num_input_images,
|
||||
turn_state.resolved_config.clone(),
|
||||
turn_state.completed.clone(),
|
||||
) else {
|
||||
unreachable!("turn event params require a fully populated turn state");
|
||||
};
|
||||
let started_at = turn_state.started_at;
|
||||
let TurnResolvedConfigFact {
|
||||
turn_id: _resolved_turn_id,
|
||||
thread_id: _resolved_thread_id,
|
||||
num_input_images: _resolved_num_input_images,
|
||||
submission_type,
|
||||
model,
|
||||
model_provider,
|
||||
sandbox_policy,
|
||||
reasoning_effort,
|
||||
reasoning_summary,
|
||||
service_tier,
|
||||
approval_policy,
|
||||
approvals_reviewer,
|
||||
sandbox_network_access,
|
||||
collaboration_mode,
|
||||
personality,
|
||||
is_first_turn,
|
||||
} = resolved_config;
|
||||
let token_usage = turn_state.token_usage.clone();
|
||||
CodexTurnEventParams {
|
||||
thread_id,
|
||||
turn_id,
|
||||
product_client_id,
|
||||
submission_type,
|
||||
model: Some(model),
|
||||
model_provider,
|
||||
sandbox_policy: Some(sandbox_policy_mode(&sandbox_policy)),
|
||||
reasoning_effort: reasoning_effort.map(|value| value.to_string()),
|
||||
reasoning_summary: reasoning_summary_mode(reasoning_summary),
|
||||
service_tier: service_tier
|
||||
.map(|value| value.to_string())
|
||||
.unwrap_or_else(|| "default".to_string()),
|
||||
approval_policy: approval_policy.to_string(),
|
||||
approvals_reviewer: approvals_reviewer.to_string(),
|
||||
sandbox_network_access,
|
||||
collaboration_mode: Some(collaboration_mode_mode(collaboration_mode)),
|
||||
personality: personality_mode(personality),
|
||||
num_input_images,
|
||||
is_first_turn,
|
||||
status: completed.status,
|
||||
turn_error: completed.turn_error,
|
||||
steer_count: None,
|
||||
total_tool_call_count: None,
|
||||
shell_command_count: None,
|
||||
file_change_count: None,
|
||||
mcp_tool_call_count: None,
|
||||
dynamic_tool_call_count: None,
|
||||
subagent_tool_call_count: None,
|
||||
web_search_count: None,
|
||||
image_generation_count: None,
|
||||
input_tokens: token_usage
|
||||
.as_ref()
|
||||
.map(|token_usage| token_usage.input_tokens),
|
||||
cached_input_tokens: token_usage
|
||||
.as_ref()
|
||||
.map(|token_usage| token_usage.cached_input_tokens),
|
||||
output_tokens: token_usage
|
||||
.as_ref()
|
||||
.map(|token_usage| token_usage.output_tokens),
|
||||
reasoning_output_tokens: token_usage
|
||||
.as_ref()
|
||||
.map(|token_usage| token_usage.reasoning_output_tokens),
|
||||
total_tokens: token_usage
|
||||
.as_ref()
|
||||
.map(|token_usage| token_usage.total_tokens),
|
||||
duration_ms: completed.duration_ms,
|
||||
started_at,
|
||||
completed_at: Some(completed.completed_at),
|
||||
}
|
||||
}
|
||||
|
||||
fn sandbox_policy_mode(sandbox_policy: &SandboxPolicy) -> &'static str {
|
||||
match sandbox_policy {
|
||||
SandboxPolicy::DangerFullAccess => "full_access",
|
||||
SandboxPolicy::ReadOnly { .. } => "read_only",
|
||||
SandboxPolicy::WorkspaceWrite { .. } => "workspace_write",
|
||||
SandboxPolicy::ExternalSandbox { .. } => "external_sandbox",
|
||||
}
|
||||
}
|
||||
|
||||
fn collaboration_mode_mode(mode: ModeKind) -> &'static str {
|
||||
match mode {
|
||||
ModeKind::Plan => "plan",
|
||||
ModeKind::Default | ModeKind::PairProgramming | ModeKind::Execute => "default",
|
||||
}
|
||||
}
|
||||
|
||||
fn reasoning_summary_mode(summary: Option<ReasoningSummary>) -> Option<String> {
|
||||
match summary {
|
||||
Some(ReasoningSummary::None) | None => None,
|
||||
Some(summary) => Some(summary.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn personality_mode(personality: Option<Personality>) -> Option<String> {
|
||||
match personality {
|
||||
Some(Personality::None) | None => None,
|
||||
Some(personality) => Some(personality.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn analytics_turn_status(status: codex_app_server_protocol::TurnStatus) -> Option<TurnStatus> {
|
||||
match status {
|
||||
codex_app_server_protocol::TurnStatus::Completed => Some(TurnStatus::Completed),
|
||||
codex_app_server_protocol::TurnStatus::Failed => Some(TurnStatus::Failed),
|
||||
codex_app_server_protocol::TurnStatus::Interrupted => Some(TurnStatus::Interrupted),
|
||||
codex_app_server_protocol::TurnStatus::InProgress => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn skill_id_for_local_skill(
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::thread_state::TurnSummary;
|
||||
use crate::thread_state::resolve_server_request_on_thread_listener;
|
||||
use crate::thread_status::ThreadWatchActiveGuard;
|
||||
use crate::thread_status::ThreadWatchManager;
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
|
||||
use codex_app_server_protocol::AdditionalPermissionProfile as V2AdditionalPermissionProfile;
|
||||
use codex_app_server_protocol::AgentMessageDeltaNotification;
|
||||
@@ -166,6 +167,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
conversation_id: ThreadId,
|
||||
conversation: Arc<CodexThread>,
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
outgoing: ThreadScopedOutgoingMessageSender,
|
||||
thread_state: Arc<tokio::sync::Mutex<ThreadState>>,
|
||||
thread_watch_manager: ThreadWatchManager,
|
||||
@@ -201,6 +203,8 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
thread_id: conversation_id.to_string(),
|
||||
turn,
|
||||
};
|
||||
analytics_events_client
|
||||
.track_notification(ServerNotification::TurnStarted(notification.clone()));
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::TurnStarted(notification))
|
||||
.await;
|
||||
@@ -217,6 +221,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
conversation_id,
|
||||
event_turn_id,
|
||||
turn_complete_event,
|
||||
Some(&analytics_events_client),
|
||||
&outgoing,
|
||||
&thread_state,
|
||||
)
|
||||
@@ -1333,8 +1338,14 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
.await;
|
||||
}
|
||||
EventMsg::TokenCount(token_count_event) => {
|
||||
handle_token_count_event(conversation_id, event_turn_id, token_count_event, &outgoing)
|
||||
.await;
|
||||
handle_token_count_event(
|
||||
conversation_id,
|
||||
event_turn_id,
|
||||
Some(&analytics_events_client),
|
||||
token_count_event,
|
||||
&outgoing,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
EventMsg::Error(ev) => {
|
||||
thread_watch_manager
|
||||
@@ -1720,6 +1731,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
conversation_id,
|
||||
event_turn_id,
|
||||
turn_aborted_event,
|
||||
Some(&analytics_events_client),
|
||||
&outgoing,
|
||||
&thread_state,
|
||||
)
|
||||
@@ -1897,6 +1909,7 @@ async fn emit_turn_completed_with_status(
|
||||
conversation_id: ThreadId,
|
||||
event_turn_id: String,
|
||||
turn_completion_metadata: TurnCompletionMetadata,
|
||||
analytics_events_client: Option<&AnalyticsEventsClient>,
|
||||
outgoing: &ThreadScopedOutgoingMessageSender,
|
||||
) {
|
||||
let notification = TurnCompletedNotification {
|
||||
@@ -1911,6 +1924,10 @@ async fn emit_turn_completed_with_status(
|
||||
duration_ms: turn_completion_metadata.duration_ms,
|
||||
},
|
||||
};
|
||||
if let Some(analytics_events_client) = analytics_events_client {
|
||||
analytics_events_client
|
||||
.track_notification(ServerNotification::TurnCompleted(notification.clone()));
|
||||
}
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::TurnCompleted(notification))
|
||||
.await;
|
||||
@@ -2103,6 +2120,7 @@ async fn handle_turn_complete(
|
||||
conversation_id: ThreadId,
|
||||
event_turn_id: String,
|
||||
turn_complete_event: TurnCompleteEvent,
|
||||
analytics_events_client: Option<&AnalyticsEventsClient>,
|
||||
outgoing: &ThreadScopedOutgoingMessageSender,
|
||||
thread_state: &Arc<Mutex<ThreadState>>,
|
||||
) {
|
||||
@@ -2123,6 +2141,7 @@ async fn handle_turn_complete(
|
||||
completed_at: turn_complete_event.completed_at,
|
||||
duration_ms: turn_complete_event.duration_ms,
|
||||
},
|
||||
analytics_events_client,
|
||||
outgoing,
|
||||
)
|
||||
.await;
|
||||
@@ -2132,6 +2151,7 @@ async fn handle_turn_interrupted(
|
||||
conversation_id: ThreadId,
|
||||
event_turn_id: String,
|
||||
turn_aborted_event: TurnAbortedEvent,
|
||||
analytics_events_client: Option<&AnalyticsEventsClient>,
|
||||
outgoing: &ThreadScopedOutgoingMessageSender,
|
||||
thread_state: &Arc<Mutex<ThreadState>>,
|
||||
) {
|
||||
@@ -2147,6 +2167,7 @@ async fn handle_turn_interrupted(
|
||||
completed_at: turn_aborted_event.completed_at,
|
||||
duration_ms: turn_aborted_event.duration_ms,
|
||||
},
|
||||
analytics_events_client,
|
||||
outgoing,
|
||||
)
|
||||
.await;
|
||||
@@ -2177,6 +2198,7 @@ async fn handle_thread_rollback_failed(
|
||||
async fn handle_token_count_event(
|
||||
conversation_id: ThreadId,
|
||||
turn_id: String,
|
||||
analytics_events_client: Option<&AnalyticsEventsClient>,
|
||||
token_count_event: TokenCountEvent,
|
||||
outgoing: &ThreadScopedOutgoingMessageSender,
|
||||
) {
|
||||
@@ -2187,6 +2209,11 @@ async fn handle_token_count_event(
|
||||
turn_id,
|
||||
token_usage,
|
||||
};
|
||||
if let Some(analytics_events_client) = analytics_events_client {
|
||||
analytics_events_client.track_notification(
|
||||
ServerNotification::ThreadTokenUsageUpdated(notification.clone()),
|
||||
);
|
||||
}
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::ThreadTokenUsageUpdated(notification))
|
||||
.await;
|
||||
@@ -2885,6 +2912,7 @@ mod tests {
|
||||
use codex_app_server_protocol::GuardianApprovalReviewStatus;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::TurnPlanStepStatus;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_protocol::items::HookPromptFragment;
|
||||
use codex_protocol::items::build_hook_prompt_message;
|
||||
@@ -3006,6 +3034,7 @@ mod tests {
|
||||
outgoing: ThreadScopedOutgoingMessageSender,
|
||||
thread_state: Arc<Mutex<ThreadState>>,
|
||||
thread_watch_manager: ThreadWatchManager,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
codex_home: PathBuf,
|
||||
}
|
||||
|
||||
@@ -3020,6 +3049,7 @@ mod tests {
|
||||
self.conversation_id,
|
||||
self.conversation.clone(),
|
||||
self.thread_manager.clone(),
|
||||
self.analytics_events_client.clone(),
|
||||
self.outgoing.clone(),
|
||||
self.thread_state.clone(),
|
||||
self.thread_watch_manager.clone(),
|
||||
@@ -3328,6 +3358,13 @@ mod tests {
|
||||
outgoing: outgoing.clone(),
|
||||
thread_state: thread_state.clone(),
|
||||
thread_watch_manager: thread_watch_manager.clone(),
|
||||
analytics_events_client: AnalyticsEventsClient::new(
|
||||
AuthManager::from_auth_for_testing(
|
||||
CodexAuth::create_dummy_chatgpt_auth_for_testing(),
|
||||
),
|
||||
"http://localhost".to_string(),
|
||||
Some(false),
|
||||
),
|
||||
codex_home: codex_home.path().to_path_buf(),
|
||||
};
|
||||
|
||||
@@ -3736,6 +3773,7 @@ mod tests {
|
||||
conversation_id,
|
||||
event_turn_id.clone(),
|
||||
turn_complete_event(&event_turn_id),
|
||||
/*analytics_events_client*/ None,
|
||||
&outgoing,
|
||||
&thread_state,
|
||||
)
|
||||
@@ -3784,6 +3822,7 @@ mod tests {
|
||||
conversation_id,
|
||||
event_turn_id.clone(),
|
||||
turn_aborted_event(&event_turn_id),
|
||||
/*analytics_events_client*/ None,
|
||||
&outgoing,
|
||||
&thread_state,
|
||||
)
|
||||
@@ -3831,6 +3870,7 @@ mod tests {
|
||||
conversation_id,
|
||||
event_turn_id.clone(),
|
||||
turn_complete_event(&event_turn_id),
|
||||
/*analytics_events_client*/ None,
|
||||
&outgoing,
|
||||
&thread_state,
|
||||
)
|
||||
@@ -3959,6 +3999,7 @@ mod tests {
|
||||
handle_token_count_event(
|
||||
conversation_id,
|
||||
turn_id.clone(),
|
||||
/*analytics_events_client*/ None,
|
||||
TokenCountEvent {
|
||||
info: Some(info),
|
||||
rate_limits: Some(rate_limits),
|
||||
@@ -4013,6 +4054,7 @@ mod tests {
|
||||
handle_token_count_event(
|
||||
conversation_id,
|
||||
turn_id.clone(),
|
||||
/*analytics_events_client*/ None,
|
||||
TokenCountEvent {
|
||||
info: None,
|
||||
rate_limits: None,
|
||||
@@ -4097,6 +4139,7 @@ mod tests {
|
||||
conversation_a,
|
||||
a_turn1.clone(),
|
||||
turn_complete_event(&a_turn1),
|
||||
/*analytics_events_client*/ None,
|
||||
&outgoing,
|
||||
&thread_state,
|
||||
)
|
||||
@@ -4118,6 +4161,7 @@ mod tests {
|
||||
conversation_b,
|
||||
b_turn1.clone(),
|
||||
turn_complete_event(&b_turn1),
|
||||
/*analytics_events_client*/ None,
|
||||
&outgoing,
|
||||
&thread_state,
|
||||
)
|
||||
@@ -4129,6 +4173,7 @@ mod tests {
|
||||
conversation_a,
|
||||
a_turn2.clone(),
|
||||
turn_complete_event(&a_turn2),
|
||||
/*analytics_events_client*/ None,
|
||||
&outgoing,
|
||||
&thread_state,
|
||||
)
|
||||
|
||||
@@ -6670,6 +6670,15 @@ impl CodexMessageProcessor {
|
||||
};
|
||||
|
||||
let response = TurnStartResponse { turn };
|
||||
if self.config.features.enabled(Feature::GeneralAnalytics) {
|
||||
self.analytics_events_client.track_response(
|
||||
request_id.connection_id.0,
|
||||
ClientResponse::TurnStart {
|
||||
request_id: request_id.request_id.clone(),
|
||||
response: response.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -7446,6 +7455,7 @@ impl CodexMessageProcessor {
|
||||
conversation_id,
|
||||
conversation.clone(),
|
||||
thread_manager.clone(),
|
||||
listener_task_context.analytics_events_client.clone(),
|
||||
thread_outgoing,
|
||||
thread_state.clone(),
|
||||
thread_watch_manager.clone(),
|
||||
|
||||
@@ -219,7 +219,12 @@ impl MessageProcessor {
|
||||
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
|
||||
outgoing: outgoing.clone(),
|
||||
}));
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
let analytics_events_client = AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
);
|
||||
let thread_manager = Arc::new(ThreadManager::new_with_analytics_events_client(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
session_source,
|
||||
@@ -229,12 +234,8 @@ impl MessageProcessor {
|
||||
.enabled(Feature::DefaultModeRequestUserInput),
|
||||
},
|
||||
environment_manager,
|
||||
Some(analytics_events_client.clone()),
|
||||
));
|
||||
let analytics_events_client = AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
);
|
||||
thread_manager
|
||||
.plugins_manager()
|
||||
.set_analytics_events_client(analytics_events_client.clone());
|
||||
@@ -671,6 +672,15 @@ impl MessageProcessor {
|
||||
self.outgoing.send_error(connection_request_id, error).await;
|
||||
return;
|
||||
}
|
||||
if self.config.features.enabled(Feature::GeneralAnalytics)
|
||||
&& let ClientRequest::TurnStart { request_id, .. } = &codex_request
|
||||
{
|
||||
self.analytics_events_client.track_request(
|
||||
connection_id.0,
|
||||
request_id.clone(),
|
||||
codex_request.clone(),
|
||||
);
|
||||
}
|
||||
|
||||
match codex_request {
|
||||
ClientRequest::ConfigRead { request_id, params } => {
|
||||
|
||||
@@ -78,3 +78,31 @@ model_provider = "{model_provider_id}"
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn write_mock_responses_config_toml_with_chatgpt_base_url(
|
||||
codex_home: &Path,
|
||||
server_uri: &str,
|
||||
chatgpt_base_url: &str,
|
||||
) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "read-only"
|
||||
chatgpt_base_url = "{chatgpt_base_url}"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ pub use auth_fixtures::encode_id_token;
|
||||
pub use auth_fixtures::write_chatgpt_auth;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
pub use config::write_mock_responses_config_toml;
|
||||
pub use config::write_mock_responses_config_toml_with_chatgpt_base_url;
|
||||
pub use core_test_support::format_with_current_shell;
|
||||
pub use core_test_support::format_with_current_shell_display;
|
||||
pub use core_test_support::format_with_current_shell_display_non_login;
|
||||
|
||||
@@ -120,6 +120,41 @@ pub(crate) async fn wait_for_analytics_payload(
|
||||
serde_json::from_slice(&body).map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"))
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_analytics_event(
|
||||
server: &MockServer,
|
||||
read_timeout: Duration,
|
||||
event_type: &str,
|
||||
) -> Result<Value> {
|
||||
timeout(read_timeout, async {
|
||||
loop {
|
||||
let Some(requests) = server.received_requests().await else {
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
continue;
|
||||
};
|
||||
for request in &requests {
|
||||
if request.method != "POST"
|
||||
|| request.url.path() != "/codex/analytics-events/events"
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let payload: Value = serde_json::from_slice(&request.body)
|
||||
.map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"))?;
|
||||
let Some(events) = payload["events"].as_array() else {
|
||||
continue;
|
||||
};
|
||||
if let Some(event) = events
|
||||
.iter()
|
||||
.find(|event| event["event_type"] == event_type)
|
||||
{
|
||||
return Ok::<Value, anyhow::Error>(event.clone());
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
pub(crate) fn thread_initialized_event(payload: &Value) -> Result<&Value> {
|
||||
let events = payload["events"]
|
||||
.as_array()
|
||||
|
||||
@@ -3,8 +3,10 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::to_response;
|
||||
use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
@@ -22,6 +24,9 @@ use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::analytics::enable_analytics_capture;
|
||||
use super::analytics::wait_for_analytics_event;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
#[tokio::test]
|
||||
@@ -43,14 +48,20 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
|
||||
std::fs::create_dir(&working_directory)?;
|
||||
|
||||
// Mock server: long-running shell command then (after abort) nothing else needed.
|
||||
let server = create_mock_responses_server_sequence(vec![create_shell_command_sse_response(
|
||||
shell_command.clone(),
|
||||
Some(&working_directory),
|
||||
Some(10_000),
|
||||
"call_sleep",
|
||||
)?])
|
||||
.await;
|
||||
create_config_toml(&codex_home, &server.uri(), "never", "danger-full-access")?;
|
||||
let server =
|
||||
create_mock_responses_server_sequence_unchecked(vec![create_shell_command_sse_response(
|
||||
shell_command.clone(),
|
||||
Some(&working_directory),
|
||||
Some(10_000),
|
||||
"call_sleep",
|
||||
)?])
|
||||
.await;
|
||||
write_mock_responses_config_toml_with_chatgpt_base_url(
|
||||
&codex_home,
|
||||
&server.uri(),
|
||||
&server.uri(),
|
||||
)?;
|
||||
enable_analytics_capture(&server, &codex_home).await?;
|
||||
|
||||
let mut mcp = McpProcess::new(&codex_home).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
@@ -87,6 +98,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
let turn_id = turn.id.clone();
|
||||
|
||||
// Give the command a brief moment to start.
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
@@ -96,7 +108,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
|
||||
let interrupt_id = mcp
|
||||
.send_turn_interrupt_request(TurnInterruptParams {
|
||||
thread_id: thread_id.clone(),
|
||||
turn_id: turn.id,
|
||||
turn_id: turn_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let interrupt_resp: JSONRPCResponse = timeout(
|
||||
@@ -119,6 +131,12 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
|
||||
assert_eq!(completed.thread_id, thread_id);
|
||||
assert_eq!(completed.turn.status, TurnStatus::Interrupted);
|
||||
|
||||
let event = wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_event").await?;
|
||||
assert_eq!(event["event_params"]["thread_id"], thread_id);
|
||||
assert_eq!(event["event_params"]["turn_id"], turn_id);
|
||||
assert_eq!(event["event_params"]["status"], "interrupted");
|
||||
assert_eq!(event["event_params"]["turn_error"], serde_json::Value::Null);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::DEFAULT_CLIENT_NAME;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_apply_patch_sse_response;
|
||||
use app_test_support::create_exec_command_sse_response;
|
||||
@@ -9,6 +10,7 @@ use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::format_with_current_shell_display;
|
||||
use app_test_support::to_response;
|
||||
use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url;
|
||||
use codex_app_server::INPUT_TOO_LARGE_ERROR_CODE;
|
||||
use codex_app_server::INVALID_PARAMS_ERROR_CODE;
|
||||
use codex_app_server_protocol::ByteRange;
|
||||
@@ -64,6 +66,9 @@ use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::analytics::enable_analytics_capture;
|
||||
use super::analytics::wait_for_analytics_event;
|
||||
|
||||
#[cfg(windows)]
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
|
||||
#[cfg(not(windows))]
|
||||
@@ -335,6 +340,148 @@ async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_tracks_turn_event_analytics() -> Result<()> {
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
write_mock_responses_config_toml_with_chatgpt_base_url(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
&server.uri(),
|
||||
)?;
|
||||
enable_analytics_capture(&server, codex_home.path()).await?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![V2UserInput::Image {
|
||||
url: "https://example.com/a.png".to_string(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let event = wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_event").await?;
|
||||
assert_eq!(event["event_params"]["thread_id"], thread.id);
|
||||
assert_eq!(event["event_params"]["turn_id"], turn.id);
|
||||
assert_eq!(
|
||||
event["event_params"]["product_client_id"],
|
||||
DEFAULT_CLIENT_NAME
|
||||
);
|
||||
assert_eq!(event["event_params"]["model"], "mock-model");
|
||||
assert_eq!(event["event_params"]["model_provider"], "mock_provider");
|
||||
assert_eq!(event["event_params"]["sandbox_policy"], "read_only");
|
||||
assert_eq!(event["event_params"]["num_input_images"], 1);
|
||||
assert_eq!(event["event_params"]["status"], "completed");
|
||||
assert!(event["event_params"]["started_at"].as_u64().is_some());
|
||||
assert!(event["event_params"]["completed_at"].as_u64().is_some());
|
||||
assert!(event["event_params"]["duration_ms"].as_u64().is_some());
|
||||
assert_eq!(event["event_params"]["input_tokens"], 0);
|
||||
assert_eq!(event["event_params"]["cached_input_tokens"], 0);
|
||||
assert_eq!(event["event_params"]["output_tokens"], 0);
|
||||
assert_eq!(event["event_params"]["reasoning_output_tokens"], 0);
|
||||
assert_eq!(event["event_params"]["total_tokens"], 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_tracks_failed_turn_event_analytics() -> Result<()> {
|
||||
let server = create_mock_responses_server_sequence(vec![String::new()]).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
write_mock_responses_config_toml_with_chatgpt_base_url(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
&server.uri(),
|
||||
)?;
|
||||
enable_analytics_capture(&server, codex_home.path()).await?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "trigger failed turn".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
|
||||
let completed_notif: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
let completed: TurnCompletedNotification = serde_json::from_value(
|
||||
completed_notif
|
||||
.params
|
||||
.expect("turn/completed params must be present"),
|
||||
)?;
|
||||
assert_eq!(completed.turn.status, TurnStatus::Failed);
|
||||
assert!(completed.turn.error.is_some());
|
||||
|
||||
let event = wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_event").await?;
|
||||
assert_eq!(event["event_params"]["thread_id"], thread.id);
|
||||
assert_eq!(event["event_params"]["turn_id"], turn.id);
|
||||
assert_eq!(event["event_params"]["status"], "failed");
|
||||
assert_ne!(event["event_params"]["turn_error"], serde_json::Value::Null);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_accepts_text_at_limit_with_mention_item() -> Result<()> {
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
|
||||
@@ -51,6 +51,7 @@ use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_analytics::AppInvocation;
|
||||
use codex_analytics::InvocationType;
|
||||
use codex_analytics::SubAgentThreadStartedInput;
|
||||
use codex_analytics::TurnResolvedConfigFact;
|
||||
use codex_analytics::build_track_events_context;
|
||||
use codex_app_server_protocol::McpServerElicitationRequest;
|
||||
use codex_app_server_protocol::McpServerElicitationRequestParams;
|
||||
@@ -185,6 +186,7 @@ use crate::config::StartedNetworkProxy;
|
||||
use crate::config::resolve_web_search_mode_for_turn;
|
||||
use crate::context_manager::ContextManager;
|
||||
use crate::context_manager::TotalTokenUsageBreakdown;
|
||||
use crate::context_manager::is_user_turn_boundary;
|
||||
use crate::environment_context::EnvironmentContext;
|
||||
use codex_config::CONFIG_TOML_FILE;
|
||||
use codex_config::types::McpServerConfig;
|
||||
@@ -430,6 +432,7 @@ pub(crate) struct CodexSpawnArgs {
|
||||
pub(crate) inherited_exec_policy: Option<Arc<ExecPolicyManager>>,
|
||||
pub(crate) user_shell_override: Option<shell::Shell>,
|
||||
pub(crate) parent_trace: Option<W3cTraceContext>,
|
||||
pub(crate) analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
}
|
||||
|
||||
pub(crate) const INITIAL_SUBMIT_ID: &str = "";
|
||||
@@ -484,6 +487,7 @@ impl Codex {
|
||||
user_shell_override,
|
||||
inherited_exec_policy,
|
||||
parent_trace: _,
|
||||
analytics_events_client,
|
||||
} = args;
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (tx_event, rx_event) = async_channel::unbounded();
|
||||
@@ -678,6 +682,7 @@ impl Codex {
|
||||
skills_watcher,
|
||||
agent_control,
|
||||
environment,
|
||||
analytics_events_client,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
@@ -817,6 +822,17 @@ pub(crate) fn session_loop_termination_from_handle(
|
||||
.shared()
|
||||
}
|
||||
|
||||
fn initial_history_has_prior_user_turns(conversation_history: &InitialHistory) -> bool {
|
||||
conversation_history.scan_rollout_items(rollout_item_is_user_turn_boundary)
|
||||
}
|
||||
|
||||
fn rollout_item_is_user_turn_boundary(item: &RolloutItem) -> bool {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(item) => is_user_turn_boundary(item),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Context for an initialized model agent
|
||||
///
|
||||
/// A session has at most 1 running task at a time, and can be interrupted by user input.
|
||||
@@ -1532,6 +1548,7 @@ impl Session {
|
||||
skills_watcher: Arc<SkillsWatcher>,
|
||||
agent_control: AgentControl,
|
||||
environment: Option<Arc<Environment>>,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
debug!(
|
||||
"Configuring session: model={}; provider={:?}",
|
||||
@@ -1936,11 +1953,13 @@ impl Session {
|
||||
),
|
||||
shell_zsh_path: config.zsh_path.clone(),
|
||||
main_execve_wrapper_exe: config.main_execve_wrapper_exe.clone(),
|
||||
analytics_events_client: AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
),
|
||||
analytics_events_client: analytics_events_client.unwrap_or_else(|| {
|
||||
AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
)
|
||||
}),
|
||||
hooks,
|
||||
rollout: Mutex::new(rollout_recorder),
|
||||
user_shell: Arc::new(default_shell),
|
||||
@@ -2257,6 +2276,11 @@ impl Session {
|
||||
SessionSource::SubAgent(_)
|
||||
)
|
||||
};
|
||||
let has_prior_user_turns = initial_history_has_prior_user_turns(&conversation_history);
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_next_turn_is_first(!has_prior_user_turns);
|
||||
}
|
||||
match conversation_history {
|
||||
InitialHistory::New => {
|
||||
// Defer initial context insertion until the first real turn starts so
|
||||
@@ -6035,6 +6059,8 @@ pub(crate) async fn run_turn(
|
||||
.await;
|
||||
}
|
||||
|
||||
track_turn_resolved_config_analytics(&sess, &turn_context, &input).await;
|
||||
|
||||
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
|
||||
sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token())
|
||||
.await;
|
||||
@@ -6321,6 +6347,42 @@ pub(crate) async fn run_turn(
|
||||
last_agent_message
|
||||
}
|
||||
|
||||
async fn track_turn_resolved_config_analytics(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
input: &[UserInput],
|
||||
) {
|
||||
let is_first_turn = {
|
||||
let mut state = sess.state.lock().await;
|
||||
state.take_next_turn_is_first()
|
||||
};
|
||||
sess.services
|
||||
.analytics_events_client
|
||||
.track_turn_resolved_config(TurnResolvedConfigFact {
|
||||
turn_id: turn_context.sub_id.clone(),
|
||||
thread_id: sess.conversation_id.to_string(),
|
||||
num_input_images: input
|
||||
.iter()
|
||||
.filter(|item| {
|
||||
matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. })
|
||||
})
|
||||
.count(),
|
||||
submission_type: None,
|
||||
model: turn_context.model_info.slug.clone(),
|
||||
model_provider: turn_context.config.model_provider_id.clone(),
|
||||
sandbox_policy: turn_context.sandbox_policy.get().clone(),
|
||||
reasoning_effort: turn_context.reasoning_effort,
|
||||
reasoning_summary: Some(turn_context.reasoning_summary),
|
||||
service_tier: turn_context.config.service_tier,
|
||||
approval_policy: turn_context.approval_policy.value(),
|
||||
approvals_reviewer: turn_context.config.approvals_reviewer,
|
||||
sandbox_network_access: turn_context.network_sandbox_policy.is_enabled(),
|
||||
collaboration_mode: turn_context.collaboration_mode.mode,
|
||||
personality: turn_context.personality,
|
||||
is_first_turn,
|
||||
});
|
||||
}
|
||||
|
||||
async fn run_pre_sampling_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
|
||||
@@ -95,6 +95,7 @@ pub(crate) async fn run_codex_thread_interactive(
|
||||
user_shell_override: None,
|
||||
inherited_exec_policy: Some(Arc::clone(&parent_session.services.exec_policy)),
|
||||
parent_trace: None,
|
||||
analytics_events_client: Some(parent_session.services.analytics_events_client.clone()),
|
||||
})
|
||||
.await?;
|
||||
if parent_session.enabled(codex_features::Feature::GeneralAnalytics) {
|
||||
|
||||
@@ -2626,6 +2626,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
|
||||
.await
|
||||
.expect("create environment"),
|
||||
)),
|
||||
/*analytics_events_client*/ None,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -457,6 +457,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
|
||||
inherited_exec_policy: Some(Arc::new(parent_exec_policy)),
|
||||
user_shell_override: None,
|
||||
parent_trace: None,
|
||||
analytics_events_client: None,
|
||||
})
|
||||
.await
|
||||
.expect("spawn guardian subagent");
|
||||
|
||||
@@ -33,6 +33,7 @@ pub(crate) struct SessionState {
|
||||
pub(crate) active_connector_selection: HashSet<String>,
|
||||
pub(crate) pending_session_start_source: Option<codex_hooks::SessionStartSource>,
|
||||
granted_permissions: Option<PermissionProfile>,
|
||||
next_turn_is_first: bool,
|
||||
}
|
||||
|
||||
impl SessionState {
|
||||
@@ -51,6 +52,7 @@ impl SessionState {
|
||||
active_connector_selection: HashSet::new(),
|
||||
pending_session_start_source: None,
|
||||
granted_permissions: None,
|
||||
next_turn_is_first: true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,6 +75,16 @@ impl SessionState {
|
||||
self.previous_turn_settings = previous_turn_settings;
|
||||
}
|
||||
|
||||
pub(crate) fn set_next_turn_is_first(&mut self, value: bool) {
|
||||
self.next_turn_is_first = value;
|
||||
}
|
||||
|
||||
pub(crate) fn take_next_turn_is_first(&mut self) -> bool {
|
||||
let is_first_turn = self.next_turn_is_first;
|
||||
self.next_turn_is_first = false;
|
||||
is_first_turn
|
||||
}
|
||||
|
||||
pub(crate) fn clone_history(&self) -> ContextManager {
|
||||
self.history.clone()
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ use crate::shell_snapshot::ShellSnapshot;
|
||||
use crate::skills_watcher::SkillsWatcher;
|
||||
use crate::skills_watcher::SkillsWatcherEvent;
|
||||
use crate::tasks::interrupted_turn_history_marker;
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_app_server_protocol::ThreadHistoryBuilder;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
@@ -208,6 +209,7 @@ pub(crate) struct ThreadManagerState {
|
||||
mcp_manager: Arc<McpManager>,
|
||||
skills_watcher: Arc<SkillsWatcher>,
|
||||
session_source: SessionSource,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
// Captures submitted ops for testing purpose when test mode is enabled.
|
||||
ops_log: Option<SharedCapturedOps>,
|
||||
}
|
||||
@@ -219,6 +221,24 @@ impl ThreadManager {
|
||||
session_source: SessionSource,
|
||||
collaboration_modes_config: CollaborationModesConfig,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
) -> Self {
|
||||
Self::new_with_analytics_events_client(
|
||||
config,
|
||||
auth_manager,
|
||||
session_source,
|
||||
collaboration_modes_config,
|
||||
environment_manager,
|
||||
/*analytics_events_client*/ None,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn new_with_analytics_events_client(
|
||||
config: &Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
session_source: SessionSource,
|
||||
collaboration_modes_config: CollaborationModesConfig,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
) -> Self {
|
||||
let codex_home = config.codex_home.clone();
|
||||
let restriction_product = session_source.restriction_product();
|
||||
@@ -257,6 +277,7 @@ impl ThreadManager {
|
||||
skills_watcher,
|
||||
auth_manager,
|
||||
session_source,
|
||||
analytics_events_client,
|
||||
ops_log: should_use_test_thread_manager_behavior()
|
||||
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
|
||||
}),
|
||||
@@ -326,6 +347,7 @@ impl ThreadManager {
|
||||
skills_watcher,
|
||||
auth_manager,
|
||||
session_source: SessionSource::Exec,
|
||||
analytics_events_client: None,
|
||||
ops_log: should_use_test_thread_manager_behavior()
|
||||
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
|
||||
}),
|
||||
@@ -867,6 +889,7 @@ impl ThreadManagerState {
|
||||
inherited_exec_policy,
|
||||
user_shell_override,
|
||||
parent_trace,
|
||||
analytics_events_client: self.analytics_events_client.clone(),
|
||||
})
|
||||
.await?;
|
||||
self.finalize_thread_spawn(codex, thread_id, watch_registration)
|
||||
|
||||
@@ -2277,6 +2277,14 @@ pub enum InitialHistory {
|
||||
}
|
||||
|
||||
impl InitialHistory {
|
||||
pub fn scan_rollout_items(&self, mut predicate: impl FnMut(&RolloutItem) -> bool) -> bool {
|
||||
match self {
|
||||
InitialHistory::New => false,
|
||||
InitialHistory::Resumed(resumed) => resumed.history.iter().any(&mut predicate),
|
||||
InitialHistory::Forked(items) => items.iter().any(predicate),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn forked_from_id(&self) -> Option<ThreadId> {
|
||||
match self {
|
||||
InitialHistory::New => None,
|
||||
|
||||
Reference in New Issue
Block a user