Compare commits

...

17 Commits

Author SHA1 Message Date
Ahmed Ibrahim
a0219c2eb9 Track turn timing in ChatGPT telemetry (#24144)
## Why
This is the turn-specific slice of the ChatGPT telemetry stack. We want
per-turn timing that breaks total turn time into request-start delay,
sampling time, and blocking tool critical path, while keeping approval
wait attributable to the turn without mixing in app-server or thread
startup costs.

## What changed
- extend `TurnTimingState` in `codex-rs/core` to accumulate
request-start delay, sampling duration, and blocking-tool critical-path
duration across a turn
- emit a `TurnTimingBreakdownFact` at turn completion and attach it to
`codex_turn_event`
- roll review durations into `approval_wait_duration_ms` and derive
`finalize_duration_ms` as the remainder after request start, sampling,
and blocking tool time
- add reducer and timing-state coverage for the new turn timing fields
and approval roll-up

## Verification
- `cargo check -p codex-analytics -p codex-app-server -p codex-core`
- `just fix -p codex-analytics -p codex-core`
2026-05-27 12:01:37 -07:00
Ahmed Ibrahim
5c78d1caa8 codex: fix CI failure on PR #24143 2026-05-22 17:54:11 -07:00
Ahmed Ibrahim
76dac513e6 codex: fix CI failure on PR #24143 2026-05-22 17:54:11 -07:00
Ahmed Ibrahim
f49153ee4c codex: keep thread timing conditioning in analytics 2026-05-22 17:54:10 -07:00
Ahmed Ibrahim
253917fc08 codex: always time thread start flows 2026-05-22 17:54:10 -07:00
Ahmed Ibrahim
82fe09d23e codex: simplify thread start timing state 2026-05-22 17:54:10 -07:00
Ahmed Ibrahim
c88ef65433 codex: group thread-start timing params 2026-05-22 17:54:10 -07:00
Ahmed Ibrahim
2be5521875 Track thread start timing in core 2026-05-22 17:54:10 -07:00
Ahmed Ibrahim
9f26d24459 Track thread start in ChatGPT telemetry 2026-05-22 17:54:10 -07:00
Ahmed Ibrahim
efab4dc64c codex: fix CI failure on PR #24142 2026-05-22 17:54:07 -07:00
Ahmed Ibrahim
517601aceb codex: fix CI failure on PR #24142 2026-05-22 17:44:01 -07:00
Ahmed Ibrahim
73fbe07c8e codex: fix CI failure on PR #24142 2026-05-22 17:37:59 -07:00
Ahmed Ibrahim
45ede0194a codex: fix CI failure on PR #24142 2026-05-22 17:28:33 -07:00
Ahmed Ibrahim
fbf6c687fe codex: fix CI failure on PR #24142 2026-05-22 17:15:45 -07:00
Ahmed Ibrahim
4fdc8e8124 codex: fix CI failure on PR #24142 2026-05-22 17:07:48 -07:00
Ahmed Ibrahim
50056e7f1d codex: fix CI failure on PR #24142 2026-05-22 17:00:14 -07:00
Ahmed Ibrahim
ee60c494da Track app-server start in ChatGPT telemetry 2026-05-22 16:54:46 -07:00
22 changed files with 992 additions and 173 deletions

View File

@@ -4,6 +4,8 @@ use crate::events::CodexAcceptedLineFingerprintsEventParams;
use crate::events::CodexAcceptedLineFingerprintsEventRequest;
use crate::events::CodexAppMentionedEventRequest;
use crate::events::CodexAppServerClientMetadata;
use crate::events::CodexAppServerStartedEventParams;
use crate::events::CodexAppServerStartedEventRequest;
use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexCommandExecutionEventParams;
use crate::events::CodexCommandExecutionEventRequest;
@@ -15,6 +17,7 @@ use crate::events::CodexReviewEventParams;
use crate::events::CodexReviewEventRequest;
use crate::events::CodexRuntimeMetadata;
use crate::events::CodexToolItemEventBase;
use crate::events::CodexTurnEventParams;
use crate::events::CodexTurnEventRequest;
use crate::events::FinalApprovalOutcome;
use crate::events::GuardianApprovalRequestSource;
@@ -30,6 +33,7 @@ use crate::events::ReviewTrigger;
use crate::events::Reviewer;
use crate::events::ThreadInitializedEvent;
use crate::events::ThreadInitializedEventParams;
use crate::events::ThreadStartTimingEventParams;
use crate::events::ToolItemTerminalStatus;
use crate::events::TrackEventRequest;
use crate::events::codex_app_metadata;
@@ -41,6 +45,7 @@ use crate::facts::AnalyticsFact;
use crate::facts::AnalyticsJsonRpcError;
use crate::facts::AppInvocation;
use crate::facts::AppMentionedInput;
use crate::facts::AppServerStartedInput;
use crate::facts::AppUsedInput;
use crate::facts::CodexCompactionEvent;
use crate::facts::CompactionImplementation;
@@ -61,10 +66,12 @@ use crate::facts::SkillInvocation;
use crate::facts::SkillInvokedInput;
use crate::facts::SubAgentThreadStartedInput;
use crate::facts::ThreadInitializationMode;
use crate::facts::ThreadStartTimingFact;
use crate::facts::TrackEventsContext;
use crate::facts::TurnResolvedConfigFact;
use crate::facts::TurnStatus;
use crate::facts::TurnSteerRequestError;
use crate::facts::TurnTimingBreakdownFact;
use crate::facts::TurnTokenUsageFact;
use crate::reducer::AnalyticsReducer;
use crate::reducer::normalize_path_for_skill_id;
@@ -331,6 +338,16 @@ fn sample_turn_token_usage_fact(thread_id: &str, turn_id: &str) -> TurnTokenUsag
}
}
fn sample_turn_timing_breakdown_fact(thread_id: &str, turn_id: &str) -> TurnTimingBreakdownFact {
TurnTimingBreakdownFact {
turn_id: turn_id.to_string(),
thread_id: thread_id.to_string(),
request_start_delay_ms: Some(120),
sampling_duration_ms: 900,
blocking_tool_critical_path_duration_ms: 140,
}
}
fn sample_turn_completed_notification(
thread_id: &str,
turn_id: &str,
@@ -1325,6 +1342,12 @@ fn thread_initialized_event_serializes_expected_shape() {
initialization_mode: ThreadInitializationMode::New,
subagent_source: None,
parent_thread_id: None,
thread_start_timing: ThreadStartTimingEventParams {
thread_start_duration_ms: Some(321),
thread_start_prepare_duration_ms: Some(111),
thread_start_spawn_duration_ms: Some(123),
thread_start_finalize_duration_ms: Some(87),
},
created_at: 1,
},
});
@@ -1356,12 +1379,49 @@ fn thread_initialized_event_serializes_expected_shape() {
"initialization_mode": "new",
"subagent_source": null,
"parent_thread_id": null,
"thread_start_duration_ms": 321,
"thread_start_prepare_duration_ms": 111,
"thread_start_spawn_duration_ms": 123,
"thread_start_finalize_duration_ms": 87,
"created_at": 1
}
})
);
}
#[test]
fn app_server_started_event_serializes_expected_shape() {
let event = TrackEventRequest::AppServerStarted(CodexAppServerStartedEventRequest {
event_type: "codex_app_server_started",
event_params: CodexAppServerStartedEventParams {
runtime: sample_runtime_metadata(),
remote_control_enabled: true,
startup_duration_ms: 987,
completed_at: 12,
},
});
let payload = serde_json::to_value(&event).expect("serialize app-server started event");
assert_eq!(
payload,
json!({
"event_type": "codex_app_server_started",
"event_params": {
"runtime": {
"codex_rs_version": "0.1.0",
"runtime_os": "macos",
"runtime_os_version": "15.3.1",
"runtime_arch": "aarch64"
},
"remote_control_enabled": true,
"startup_duration_ms": 987,
"completed_at": 12
}
})
);
}
#[test]
fn command_execution_event_serializes_expected_shape() {
let event = TrackEventRequest::CommandExecution(CodexCommandExecutionEventRequest {
@@ -1637,6 +1697,114 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
payload[0]["event_params"]["runtime"]["runtime_arch"],
"x86_64"
);
assert_eq!(
payload[0]["event_params"]["thread_start_duration_ms"],
json!(null)
);
assert_eq!(
payload[0]["event_params"]["thread_start_prepare_duration_ms"],
json!(null)
);
assert_eq!(
payload[0]["event_params"]["thread_start_spawn_duration_ms"],
json!(null)
);
assert_eq!(
payload[0]["event_params"]["thread_start_finalize_duration_ms"],
json!(null)
);
}
#[tokio::test]
async fn app_server_started_fact_emits_event() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::AppServerStarted(
AppServerStartedInput {
runtime: sample_runtime_metadata(),
remote_control_enabled: true,
startup_duration_ms: 456,
completed_at: 12,
},
)),
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(
payload,
json!([{
"event_type": "codex_app_server_started",
"event_params": {
"runtime": {
"codex_rs_version": "0.1.0",
"runtime_os": "macos",
"runtime_os_version": "15.3.1",
"runtime_arch": "aarch64"
},
"remote_control_enabled": true,
"startup_duration_ms": 456,
"completed_at": 12
}
}])
);
}
#[tokio::test]
async fn thread_start_timing_fact_enriches_thread_initialized_event() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_initialize(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::ThreadStartTiming(
ThreadStartTimingFact {
thread_id: "thread-1".to_string(),
duration_ms: 222,
prepare_duration_ms: 12,
spawn_duration_ms: 123,
finalize_duration_ms: 87,
},
)),
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(2),
response: Box::new(sample_thread_start_response(
"thread-1", /*ephemeral*/ true, "gpt-5",
)),
},
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload[0]["event_type"], json!("codex_thread_initialized"));
assert_eq!(
payload[0]["event_params"]["thread_start_duration_ms"],
json!(222)
);
assert_eq!(
payload[0]["event_params"]["thread_start_prepare_duration_ms"],
json!(12)
);
assert_eq!(
payload[0]["event_params"]["thread_start_spawn_duration_ms"],
json!(123)
);
assert_eq!(
payload[0]["event_params"]["thread_start_finalize_duration_ms"],
json!(87)
);
}
#[tokio::test]
@@ -3188,7 +3356,7 @@ async fn reducer_ingests_plugin_state_changed_fact() {
fn turn_event_serializes_expected_shape() {
let event = TrackEventRequest::TurnEvent(Box::new(CodexTurnEventRequest {
event_type: "codex_turn_event",
event_params: crate::events::CodexTurnEventParams {
event_params: CodexTurnEventParams {
thread_id: "thread-2".to_string(),
turn_id: "turn-2".to_string(),
app_server_client: sample_app_server_client_metadata(),
@@ -3229,6 +3397,11 @@ fn turn_event_serializes_expected_shape() {
reasoning_output_tokens: None,
total_tokens: None,
duration_ms: Some(1234),
request_start_delay_ms: Some(120),
sampling_duration_ms: Some(900),
blocking_tool_critical_path_duration_ms: Some(140),
approval_wait_duration_ms: Some(0),
finalize_duration_ms: Some(74),
started_at: Some(455),
completed_at: Some(456),
},
@@ -3290,6 +3463,11 @@ fn turn_event_serializes_expected_shape() {
"reasoning_output_tokens": null,
"total_tokens": null,
"duration_ms": 1234,
"request_start_delay_ms": 120,
"sampling_duration_ms": 900,
"blocking_tool_critical_path_duration_ms": 140,
"approval_wait_duration_ms": 0,
"finalize_duration_ms": 74,
"started_at": 455,
"completed_at": 456
}
@@ -3605,6 +3783,164 @@ async fn turn_lifecycle_emits_turn_event() {
json!(13)
);
assert_eq!(payload["event_params"]["total_tokens"], json!(321));
assert_eq!(
payload["event_params"]["request_start_delay_ms"],
json!(null)
);
assert_eq!(payload["event_params"]["sampling_duration_ms"], json!(null));
assert_eq!(
payload["event_params"]["blocking_tool_critical_path_duration_ms"],
json!(null)
);
assert_eq!(
payload["event_params"]["approval_wait_duration_ms"],
json!(null)
);
assert_eq!(payload["event_params"]["finalize_duration_ms"], json!(null));
}
#[tokio::test]
async fn turn_timing_breakdown_fact_enriches_turn_event() {
let mut reducer = AnalyticsReducer::default();
let mut out = Vec::new();
ingest_turn_prerequisites(
&mut reducer,
&mut out,
/*include_initialize*/ true,
/*include_resolved_config*/ true,
/*include_started*/ true,
/*include_token_usage*/ false,
)
.await;
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::TurnTimingBreakdown(Box::new(
sample_turn_timing_breakdown_fact("thread-2", "turn-2"),
))),
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
"thread-2",
"turn-2",
AppServerTurnStatus::Completed,
/*codex_error_info*/ None,
))),
&mut out,
)
.await;
let payload = serde_json::to_value(&out[0]).expect("serialize turn event");
assert_eq!(
payload["event_params"]["request_start_delay_ms"],
json!(120)
);
assert_eq!(payload["event_params"]["sampling_duration_ms"], json!(900));
assert_eq!(
payload["event_params"]["blocking_tool_critical_path_duration_ms"],
json!(140)
);
assert_eq!(
payload["event_params"]["approval_wait_duration_ms"],
json!(0)
);
assert_eq!(payload["event_params"]["finalize_duration_ms"], json!(74));
}
#[tokio::test]
async fn review_durations_roll_up_into_turn_approval_wait() {
let mut reducer = AnalyticsReducer::default();
let mut out = Vec::new();
ingest_turn_prerequisites(
&mut reducer,
&mut out,
/*include_initialize*/ true,
/*include_resolved_config*/ true,
/*include_started*/ true,
/*include_token_usage*/ false,
)
.await;
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::TurnTimingBreakdown(Box::new(
TurnTimingBreakdownFact {
turn_id: "turn-2".to_string(),
thread_id: "thread-2".to_string(),
request_start_delay_ms: Some(100),
sampling_duration_ms: 400,
blocking_tool_critical_path_duration_ms: 700,
},
))),
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::ServerRequest {
connection_id: 7,
request: Box::new(ServerRequest::CommandExecutionRequestApproval {
request_id: RequestId::Integer(99),
params: CommandExecutionRequestApprovalParams {
thread_id: "thread-2".to_string(),
turn_id: "turn-2".to_string(),
item_id: "item-1".to_string(),
started_at_ms: 1_000,
approval_id: None,
reason: None,
network_approval_context: None,
command: Some("echo hi".to_string()),
cwd: None,
command_actions: None,
additional_permissions: None,
proposed_execpolicy_amendment: None,
proposed_network_policy_amendments: None,
available_decisions: None,
},
}),
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::ServerResponse {
completed_at_ms: 1_600,
response: Box::new(ServerResponse::CommandExecutionRequestApproval {
request_id: RequestId::Integer(99),
response: CommandExecutionRequestApprovalResponse {
decision: CommandExecutionApprovalDecision::Accept,
},
}),
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
"thread-2",
"turn-2",
AppServerTurnStatus::Completed,
/*codex_error_info*/ None,
))),
&mut out,
)
.await;
let turn_event = out
.iter()
.find(|event| matches!(event, TrackEventRequest::TurnEvent(_)))
.expect("turn event should be emitted");
let payload = serde_json::to_value(turn_event).expect("serialize turn event");
assert_eq!(
payload["event_params"]["approval_wait_duration_ms"],
json!(600)
);
assert_eq!(payload["event_params"]["finalize_duration_ms"], json!(34));
}
#[tokio::test]

View File

@@ -8,6 +8,7 @@ use crate::facts::AnalyticsFact;
use crate::facts::AnalyticsJsonRpcError;
use crate::facts::AppInvocation;
use crate::facts::AppMentionedInput;
use crate::facts::AppServerStartedInput;
use crate::facts::AppUsedInput;
use crate::facts::CustomAnalyticsFact;
use crate::facts::HookRunFact;
@@ -17,9 +18,12 @@ use crate::facts::PluginStateChangedInput;
use crate::facts::SkillInvocation;
use crate::facts::SkillInvokedInput;
use crate::facts::SubAgentThreadStartedInput;
use crate::facts::ThreadStartTimingFact;
use crate::facts::TrackEventsContext;
use crate::facts::TurnResolvedConfigFact;
use crate::facts::TurnTimingBreakdownFact;
use crate::facts::TurnTokenUsageFact;
use crate::now_unix_seconds;
use crate::reducer::AnalyticsReducer;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponsePayload;
@@ -38,12 +42,35 @@ use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::mpsc;
const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256;
const ANALYTICS_EVENTS_TIMEOUT: Duration = Duration::from_secs(10);
const ANALYTICS_EVENT_DEDUPE_MAX_KEYS: usize = 4096;
#[derive(Clone, Copy, Debug)]
pub struct StartedTimer {
started_at: Instant,
}
impl StartedTimer {
#[must_use]
pub fn start() -> Self {
Self {
started_at: Instant::now(),
}
}
fn elapsed_ms(self) -> u64 {
self.started_at
.elapsed()
.as_millis()
.try_into()
.unwrap_or(u64::MAX)
}
}
#[derive(Clone)]
pub(crate) struct AnalyticsEventsQueue {
pub(crate) sender: mpsc::Sender<AnalyticsFact>,
@@ -163,12 +190,35 @@ impl AnalyticsEventsClient {
});
}
pub fn track_app_server_started(&self, timer: StartedTimer, remote_control_enabled: bool) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::AppServerStarted(AppServerStartedInput {
runtime: current_runtime_metadata(),
remote_control_enabled,
startup_duration_ms: timer.elapsed_ms(),
completed_at: now_unix_seconds(),
}),
));
}
pub fn track_subagent_thread_started(&self, input: SubAgentThreadStartedInput) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::SubAgentThreadStarted(input),
));
}
pub fn track_thread_start_timing(&self, fact: ThreadStartTimingFact) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::ThreadStartTiming(fact),
));
}
pub fn track_turn_timing(&self, fact: TurnTimingBreakdownFact) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::TurnTimingBreakdown(Box::new(fact)),
));
}
pub fn track_guardian_review(
&self,
tracking: &GuardianReviewTrackContext,

View File

@@ -56,6 +56,7 @@ pub(crate) struct TrackEventsRequest {
#[serde(untagged)]
pub(crate) enum TrackEventRequest {
SkillInvocation(SkillInvocationEventRequest),
AppServerStarted(CodexAppServerStartedEventRequest),
ThreadInitialized(ThreadInitializedEvent),
GuardianReview(Box<GuardianReviewEventRequest>),
AppMentioned(CodexAppMentionedEventRequest),
@@ -144,6 +145,28 @@ pub(crate) struct CodexRuntimeMetadata {
pub(crate) runtime_arch: String,
}
#[derive(Serialize)]
pub(crate) struct CodexAppServerStartedEventParams {
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) remote_control_enabled: bool,
pub(crate) startup_duration_ms: u64,
pub(crate) completed_at: u64,
}
#[derive(Serialize)]
pub(crate) struct CodexAppServerStartedEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexAppServerStartedEventParams,
}
#[derive(Default, Serialize)]
pub(crate) struct ThreadStartTimingEventParams {
pub(crate) thread_start_duration_ms: Option<u64>,
pub(crate) thread_start_prepare_duration_ms: Option<u64>,
pub(crate) thread_start_spawn_duration_ms: Option<u64>,
pub(crate) thread_start_finalize_duration_ms: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct ThreadInitializedEventParams {
pub(crate) thread_id: String,
@@ -155,6 +178,8 @@ pub(crate) struct ThreadInitializedEventParams {
pub(crate) initialization_mode: ThreadInitializationMode,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
#[serde(flatten)]
pub(crate) thread_start_timing: ThreadStartTimingEventParams,
pub(crate) created_at: u64,
}
@@ -808,6 +833,11 @@ pub(crate) struct CodexTurnEventParams {
pub(crate) reasoning_output_tokens: Option<i64>,
pub(crate) total_tokens: Option<i64>,
pub(crate) duration_ms: Option<u64>,
pub(crate) request_start_delay_ms: Option<u64>,
pub(crate) sampling_duration_ms: Option<u64>,
pub(crate) blocking_tool_critical_path_duration_ms: Option<u64>,
pub(crate) approval_wait_duration_ms: Option<u64>,
pub(crate) finalize_duration_ms: Option<u64>,
pub(crate) started_at: Option<u64>,
pub(crate) completed_at: Option<u64>,
}
@@ -1042,6 +1072,7 @@ pub(crate) fn subagent_thread_started_event_request(
parent_thread_id: input
.parent_thread_id
.or_else(|| subagent_parent_thread_id(&input.subagent_source)),
thread_start_timing: ThreadStartTimingEventParams::default(),
created_at: input.created_at,
};
ThreadInitializedEvent {

View File

@@ -99,6 +99,15 @@ pub struct TurnTokenUsageFact {
pub token_usage: TokenUsage,
}
#[derive(Clone)]
pub struct TurnTimingBreakdownFact {
pub turn_id: String,
pub thread_id: String,
pub request_start_delay_ms: Option<u64>,
pub sampling_duration_ms: u64,
pub blocking_tool_critical_path_duration_ms: u64,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum TurnStatus {
@@ -274,6 +283,21 @@ pub struct CodexCompactionEvent {
pub duration_ms: Option<u64>,
}
pub(crate) struct AppServerStartedInput {
pub runtime: CodexRuntimeMetadata,
pub remote_control_enabled: bool,
pub startup_duration_ms: u64,
pub completed_at: u64,
}
pub struct ThreadStartTimingFact {
pub thread_id: String,
pub duration_ms: u64,
pub prepare_duration_ms: u64,
pub spawn_duration_ms: u64,
pub finalize_duration_ms: u64,
}
#[allow(dead_code)]
pub(crate) enum AnalyticsFact {
Initialize {
@@ -323,10 +347,13 @@ pub(crate) enum AnalyticsFact {
}
pub(crate) enum CustomAnalyticsFact {
AppServerStarted(AppServerStartedInput),
SubAgentThreadStarted(SubAgentThreadStartedInput),
ThreadStartTiming(ThreadStartTimingFact),
Compaction(Box<CodexCompactionEvent>),
GuardianReview(Box<GuardianReviewEventParams>),
TurnResolvedConfig(Box<TurnResolvedConfigFact>),
TurnTimingBreakdown(Box<TurnTimingBreakdownFact>),
TurnTokenUsage(Box<TurnTokenUsageFact>),
SkillInvoked(SkillInvokedInput),
AppMentioned(AppMentionedInput),

View File

@@ -10,6 +10,7 @@ use std::time::UNIX_EPOCH;
pub use accepted_lines::accepted_line_fingerprints_from_unified_diff;
pub use accepted_lines::fingerprint_hash;
pub use client::AnalyticsEventsClient;
pub use client::StartedTimer;
pub use events::AppServerRpcTransport;
pub use events::GuardianApprovalRequestSource;
pub use events::GuardianReviewAnalyticsResult;
@@ -37,12 +38,14 @@ pub use facts::InvocationType;
pub use facts::SkillInvocation;
pub use facts::SubAgentThreadStartedInput;
pub use facts::ThreadInitializationMode;
pub use facts::ThreadStartTimingFact;
pub use facts::TrackEventsContext;
pub use facts::TurnResolvedConfigFact;
pub use facts::TurnStatus;
pub use facts::TurnSteerRejectionReason;
pub use facts::TurnSteerRequestError;
pub use facts::TurnSteerResult;
pub use facts::TurnTimingBreakdownFact;
pub use facts::TurnTokenUsageFact;
pub use facts::build_track_events_context;

View File

@@ -5,6 +5,8 @@ use crate::accepted_lines::accepted_line_repo_hash_for_cwd;
use crate::events::AppServerRpcTransport;
use crate::events::CodexAppMentionedEventRequest;
use crate::events::CodexAppServerClientMetadata;
use crate::events::CodexAppServerStartedEventParams;
use crate::events::CodexAppServerStartedEventRequest;
use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexCollabAgentToolCallEventParams;
use crate::events::CodexCollabAgentToolCallEventRequest;
@@ -45,6 +47,7 @@ use crate::events::SkillInvocationEventParams;
use crate::events::SkillInvocationEventRequest;
use crate::events::ThreadInitializedEvent;
use crate::events::ThreadInitializedEventParams;
use crate::events::ThreadStartTimingEventParams;
use crate::events::ToolItemFailureKind;
use crate::events::ToolItemTerminalStatus;
use crate::events::TrackEventRequest;
@@ -61,6 +64,7 @@ use crate::events::subagent_thread_started_event_request;
use crate::facts::AnalyticsFact;
use crate::facts::AnalyticsJsonRpcError;
use crate::facts::AppMentionedInput;
use crate::facts::AppServerStartedInput;
use crate::facts::AppUsedInput;
use crate::facts::CodexCompactionEvent;
use crate::facts::CustomAnalyticsFact;
@@ -71,10 +75,12 @@ use crate::facts::PluginUsedInput;
use crate::facts::SkillInvokedInput;
use crate::facts::SubAgentThreadStartedInput;
use crate::facts::ThreadInitializationMode;
use crate::facts::ThreadStartTimingFact;
use crate::facts::TurnResolvedConfigFact;
use crate::facts::TurnStatus;
use crate::facts::TurnSteerRejectionReason;
use crate::facts::TurnSteerResult;
use crate::facts::TurnTimingBreakdownFact;
use crate::facts::TurnTokenUsageFact;
use crate::now_unix_seconds;
use crate::option_i64_to_u64;
@@ -147,6 +153,15 @@ struct ConnectionState {
struct ThreadAnalyticsState {
connection_id: Option<u64>,
metadata: Option<ThreadMetadataState>,
thread_start_timing: Option<ThreadStartTimingState>,
}
#[derive(Clone, Copy)]
struct ThreadStartTimingState {
duration_ms: u64,
prepare_duration_ms: u64,
spawn_duration_ms: u64,
finalize_duration_ms: u64,
}
#[derive(Clone, Copy)]
@@ -314,6 +329,14 @@ struct CompletedTurnState {
duration_ms: Option<u64>,
}
#[derive(Clone, Default)]
struct TurnTimingState {
request_start_delay_ms: Option<u64>,
sampling_duration_ms: Option<u64>,
blocking_tool_critical_path_duration_ms: Option<u64>,
approval_wait_duration_ms: u64,
}
struct TurnState {
connection_id: Option<u64>,
thread_id: Option<String>,
@@ -325,6 +348,7 @@ struct TurnState {
latest_diff: Option<String>,
steer_count: usize,
tool_counts: TurnToolCounts,
timing: TurnTimingState,
}
#[derive(Hash, Eq, PartialEq)]
@@ -446,9 +470,15 @@ impl AnalyticsReducer {
self.ingest_server_request_aborted(completed_at_ms, request_id, out);
}
AnalyticsFact::Custom(input) => match input {
CustomAnalyticsFact::AppServerStarted(input) => {
self.ingest_app_server_started(input, out);
}
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
self.ingest_subagent_thread_started(input, out);
}
CustomAnalyticsFact::ThreadStartTiming(input) => {
self.ingest_thread_start_timing(input);
}
CustomAnalyticsFact::Compaction(input) => {
self.ingest_compaction(*input, out);
}
@@ -458,6 +488,9 @@ impl AnalyticsReducer {
CustomAnalyticsFact::TurnResolvedConfig(input) => {
self.ingest_turn_resolved_config(*input, out).await;
}
CustomAnalyticsFact::TurnTimingBreakdown(input) => {
self.ingest_turn_timing_breakdown(*input, out).await;
}
CustomAnalyticsFact::TurnTokenUsage(input) => {
self.ingest_turn_token_usage(*input, out).await;
}
@@ -508,6 +541,24 @@ impl AnalyticsReducer {
);
}
fn ingest_app_server_started(
&mut self,
input: AppServerStartedInput,
out: &mut Vec<TrackEventRequest>,
) {
out.push(TrackEventRequest::AppServerStarted(
CodexAppServerStartedEventRequest {
event_type: "codex_app_server_started",
event_params: CodexAppServerStartedEventParams {
runtime: input.runtime,
remote_control_enabled: input.remote_control_enabled,
startup_duration_ms: input.startup_duration_ms,
completed_at: input.completed_at,
},
},
));
}
fn ingest_subagent_thread_started(
&mut self,
input: SubAgentThreadStartedInput,
@@ -538,6 +589,19 @@ impl AnalyticsReducer {
));
}
fn ingest_thread_start_timing(&mut self, input: ThreadStartTimingFact) {
let thread_start_timing = ThreadStartTimingState {
duration_ms: input.duration_ms,
prepare_duration_ms: input.prepare_duration_ms,
spawn_duration_ms: input.spawn_duration_ms,
finalize_duration_ms: input.finalize_duration_ms,
};
self.threads
.entry(input.thread_id)
.or_default()
.thread_start_timing = Some(thread_start_timing);
}
fn ingest_guardian_review(
&mut self,
input: GuardianReviewEventParams,
@@ -610,6 +674,7 @@ impl AnalyticsReducer {
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
timing: TurnTimingState::default(),
});
turn_state.thread_id = Some(thread_id);
turn_state.num_input_images = Some(num_input_images);
@@ -634,12 +699,40 @@ impl AnalyticsReducer {
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
timing: TurnTimingState::default(),
});
turn_state.thread_id = Some(input.thread_id);
turn_state.token_usage = Some(input.token_usage);
self.maybe_emit_turn_event(&turn_id, out).await;
}
async fn ingest_turn_timing_breakdown(
&mut self,
input: TurnTimingBreakdownFact,
out: &mut Vec<TrackEventRequest>,
) {
let turn_id = input.turn_id.clone();
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
timing: TurnTimingState::default(),
});
turn_state.thread_id = Some(input.thread_id);
turn_state.timing.request_start_delay_ms = input.request_start_delay_ms;
turn_state.timing.sampling_duration_ms = Some(input.sampling_duration_ms);
turn_state.timing.blocking_tool_critical_path_duration_ms =
Some(input.blocking_tool_critical_path_duration_ms);
self.maybe_emit_turn_event(&turn_id, out).await;
}
async fn ingest_skill_invoked(
&mut self,
input: SkillInvokedInput,
@@ -799,6 +892,7 @@ impl AnalyticsReducer {
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
timing: TurnTimingState::default(),
});
turn_state.connection_id = Some(connection_id);
turn_state.thread_id = Some(pending_request.thread_id);
@@ -1158,6 +1252,7 @@ impl AnalyticsReducer {
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
timing: TurnTimingState::default(),
});
turn_state.started_at = notification
.turn
@@ -1179,6 +1274,7 @@ impl AnalyticsReducer {
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
timing: TurnTimingState::default(),
});
turn_state.thread_id = Some(notification.thread_id);
turn_state.latest_diff = Some(notification.diff);
@@ -1198,6 +1294,7 @@ impl AnalyticsReducer {
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
timing: TurnTimingState::default(),
});
turn_state.completed = Some(CompletedTurnState {
status: analytics_turn_status(notification.turn.status),
@@ -1240,13 +1337,12 @@ impl AnalyticsReducer {
thread.thread_source.map(Into::into),
initialization_mode,
);
self.threads.insert(
thread_id.clone(),
ThreadAnalyticsState {
connection_id: Some(connection_id),
metadata: Some(thread_metadata.clone()),
},
);
let thread_state = self.threads.entry(thread_id.clone()).or_default();
thread_state.connection_id = Some(connection_id);
thread_state.metadata = Some(thread_metadata.clone());
let thread_start_timing = matches!(initialization_mode, ThreadInitializationMode::New)
.then_some(thread_state.thread_start_timing)
.flatten();
out.push(TrackEventRequest::ThreadInitialized(
ThreadInitializedEvent {
event_type: "codex_thread_initialized",
@@ -1260,6 +1356,15 @@ impl AnalyticsReducer {
initialization_mode,
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id,
thread_start_timing: match thread_start_timing {
Some(timing) => ThreadStartTimingEventParams {
thread_start_duration_ms: Some(timing.duration_ms),
thread_start_prepare_duration_ms: Some(timing.prepare_duration_ms),
thread_start_spawn_duration_ms: Some(timing.spawn_duration_ms),
thread_start_finalize_duration_ms: Some(timing.finalize_duration_ms),
},
None => ThreadStartTimingEventParams::default(),
},
created_at: u64::try_from(thread.created_at).unwrap_or_default(),
},
},
@@ -1403,6 +1508,7 @@ impl AnalyticsReducer {
completed_at_ms: u64,
out: &mut Vec<TrackEventRequest>,
) {
let duration_ms = observed_duration_ms(pending_review.started_at_ms, completed_at_ms);
if let Some(item_key) = item_review_summary_key(&pending_review) {
self.record_item_review_summary(
item_key,
@@ -1412,6 +1518,28 @@ impl AnalyticsReducer {
&pending_review,
);
}
if let Some(duration_ms) = duration_ms {
let turn_state =
self.turns
.entry(pending_review.turn_id.clone())
.or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
timing: TurnTimingState::default(),
});
turn_state.timing.approval_wait_duration_ms = turn_state
.timing
.approval_wait_duration_ms
.saturating_add(duration_ms);
}
let Some((connection_state, thread_metadata)) =
self.thread_context_or_warn(AnalyticsDropSite::review(&pending_review))
else {
@@ -1437,7 +1565,7 @@ impl AnalyticsReducer {
resolution,
started_at_ms: pending_review.started_at_ms,
completed_at_ms,
duration_ms: observed_duration_ms(pending_review.started_at_ms, completed_at_ms),
duration_ms,
},
}));
}
@@ -2445,6 +2573,26 @@ fn codex_turn_event_params(
is_first_turn,
} = resolved_config;
let token_usage = turn_state.token_usage.clone();
let timing = &turn_state.timing;
let has_turn_timing = timing.request_start_delay_ms.is_some()
|| timing.sampling_duration_ms.is_some()
|| timing.blocking_tool_critical_path_duration_ms.is_some();
let approval_wait_duration_ms = has_turn_timing.then_some(timing.approval_wait_duration_ms);
let finalize_duration_ms = completed.duration_ms.and_then(|duration_ms| {
has_turn_timing.then_some(
duration_ms.saturating_sub(
timing
.request_start_delay_ms
.unwrap_or_default()
.saturating_add(timing.sampling_duration_ms.unwrap_or_default())
.saturating_add(
timing
.blocking_tool_critical_path_duration_ms
.unwrap_or_default(),
),
),
)
});
CodexTurnEventParams {
thread_id,
turn_id,
@@ -2501,6 +2649,11 @@ fn codex_turn_event_params(
.as_ref()
.map(|token_usage| token_usage.total_tokens),
duration_ms: completed.duration_ms,
request_start_delay_ms: timing.request_start_delay_ms,
sampling_duration_ms: timing.sampling_duration_ms,
blocking_tool_critical_path_duration_ms: timing.blocking_tool_critical_path_duration_ms,
approval_wait_duration_ms,
finalize_duration_ms,
started_at,
completed_at: Some(completed.completed_at),
}

View File

@@ -41,6 +41,7 @@ use crate::transport::start_remote_control;
use crate::transport::start_stdio_connection;
use crate::transport::start_websocket_acceptor;
use codex_analytics::AppServerRpcTransport;
use codex_analytics::StartedTimer;
use codex_app_server_protocol::ConfigLayerSource;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::JSONRPCMessage;
@@ -428,6 +429,7 @@ pub async fn run_main_with_transport_options(
auth: AppServerWebsocketAuthSettings,
runtime_options: AppServerRuntimeOptions,
) -> IoResult<()> {
let app_server_start_timer = StartedTimer::start();
let (transport_event_tx, mut transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
@@ -698,6 +700,9 @@ pub async fn run_main_with_transport_options(
let auth_manager =
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
let analytics_transport = analytics_rpc_transport(&transport);
let remote_control_requested = runtime_options.remote_control_enabled;
let remote_control_enabled = remote_control_requested && state_db.is_some();
@@ -787,8 +792,7 @@ pub async fn run_main_with_transport_options(
let processor_handle = tokio::spawn({
let auth_manager = Arc::clone(&auth_manager);
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
let analytics_events_client = analytics_events_client.clone();
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client.clone(),
@@ -809,7 +813,7 @@ pub async fn run_main_with_transport_options(
session_source,
auth_manager,
installation_id,
rpc_transport: analytics_rpc_transport(&transport),
rpc_transport: analytics_transport,
remote_control_handle: Some(remote_control_handle.clone()),
plugin_startup_tasks: runtime_options.plugin_startup_tasks,
}));
@@ -1065,6 +1069,8 @@ pub async fn run_main_with_transport_options(
info!("processor task exited (channel closed)");
}
});
analytics_events_client
.track_app_server_started(app_server_start_timer, remote_control_enabled);
drop(transport_event_tx);

View File

@@ -21,6 +21,12 @@ use wiremock::matchers::path;
const SERVICE_VERSION: &str = "0.0.0-test";
#[cfg(any(target_os = "macos", windows))]
pub(crate) const ANALYTICS_TEST_TIMEOUT: Duration = Duration::from_secs(25);
#[cfg(not(any(target_os = "macos", windows)))]
pub(crate) const ANALYTICS_TEST_TIMEOUT: Duration = Duration::from_secs(10);
fn set_metrics_exporter(config: &mut codex_core::config::Config) {
config.otel.metrics_exporter = OtelExporterKind::OtlpHttp {
endpoint: "http://localhost:4318".to_string(),
@@ -98,28 +104,6 @@ pub(crate) async fn mount_analytics_capture(server: &MockServer, codex_home: &Pa
Ok(())
}
pub(crate) async fn wait_for_analytics_payload(
server: &MockServer,
read_timeout: Duration,
) -> Result<Value> {
let body = timeout(read_timeout, async {
loop {
let Some(requests) = server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
if let Some(request) = requests.iter().find(|request| {
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
}) {
break request.body.clone();
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?;
serde_json::from_slice(&body).map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"))
}
pub(crate) async fn wait_for_analytics_event(
server: &MockServer,
read_timeout: Duration,
@@ -133,7 +117,10 @@ pub(crate) async fn wait_for_analytics_event(
};
for request in &requests {
if request.method != "POST"
|| request.url.path() != "/codex/analytics-events/events"
|| !request
.url
.path()
.ends_with("/codex/analytics-events/events")
{
continue;
}
@@ -155,16 +142,6 @@ pub(crate) async fn wait_for_analytics_event(
.await?
}
pub(crate) fn thread_initialized_event(payload: &Value) -> Result<&Value> {
let events = payload["events"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("analytics payload missing events array"))?;
events
.iter()
.find(|event| event["event_type"] == "codex_thread_initialized")
.ok_or_else(|| anyhow::anyhow!("codex_thread_initialized event should be present"))
}
pub(crate) fn assert_basic_thread_initialized_event(
event: &Value,
thread_id: &str,

View File

@@ -58,6 +58,8 @@ use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::matchers::query_param;
use super::analytics::wait_for_analytics_event;
// Plugin install tests wait on connector discovery after the install response path
// starts, which is noticeably slower on Windows CI.
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
@@ -727,22 +729,22 @@ async fn plugin_install_tracks_analytics_event() -> Result<()> {
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
let payload = wait_for_plugin_analytics_payload(&analytics_server).await?;
let event =
wait_for_analytics_event(&analytics_server, DEFAULT_TIMEOUT, "codex_plugin_installed")
.await?;
assert_eq!(
payload,
event,
json!({
"events": [{
"event_type": "codex_plugin_installed",
"event_params": {
"plugin_id": "sample-plugin@debug",
"plugin_name": "sample-plugin",
"marketplace_name": "debug",
"has_skills": false,
"mcp_server_count": 0,
"connector_ids": [],
"product_client_id": DEFAULT_CLIENT_NAME,
}
}]
"event_type": "codex_plugin_installed",
"event_params": {
"plugin_id": "sample-plugin@debug",
"plugin_name": "sample-plugin",
"marketplace_name": "debug",
"has_skills": false,
"mcp_server_count": 0,
"connector_ids": [],
"product_client_id": DEFAULT_CLIENT_NAME,
}
})
);
Ok(())
@@ -780,22 +782,21 @@ async fn plugin_install_tracks_remote_plugin_analytics_event() -> Result<()> {
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
let payload = wait_for_plugin_analytics_payload(&server).await?;
let event =
wait_for_analytics_event(&server, DEFAULT_TIMEOUT, "codex_plugin_installed").await?;
assert_eq!(
payload,
event,
json!({
"events": [{
"event_type": "codex_plugin_installed",
"event_params": {
"plugin_id": REMOTE_PLUGIN_ID,
"plugin_name": "linear",
"marketplace_name": "openai-curated-remote",
"has_skills": true,
"mcp_server_count": 0,
"connector_ids": [],
"product_client_id": DEFAULT_CLIENT_NAME,
}
}]
"event_type": "codex_plugin_installed",
"event_params": {
"plugin_id": REMOTE_PLUGIN_ID,
"plugin_name": "linear",
"marketplace_name": "openai-curated-remote",
"has_skills": true,
"mcp_server_count": 0,
"connector_ids": [],
"product_client_id": DEFAULT_CLIENT_NAME,
}
})
);
Ok(())
@@ -1288,29 +1289,6 @@ async fn mount_backend_analytics_events(server: &MockServer) {
.await;
}
async fn wait_for_plugin_analytics_payload(server: &MockServer) -> Result<serde_json::Value> {
timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
if let Some(request) = requests.iter().find(|request| {
request.method == "POST"
&& request
.url
.path()
.ends_with("/codex/analytics-events/events")
}) {
return serde_json::from_slice(&request.body)
.map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"));
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?
}
fn write_remote_plugin_catalog_config(
codex_home: &std::path::Path,
base_url: &str,

View File

@@ -383,7 +383,13 @@ plugin_sharing = false
.received_requests()
.await
.expect("wiremock should record requests")
.is_empty()
.iter()
.all(|request| {
request
.url
.path()
.ends_with("/codex/analytics-events/events")
})
);
Ok(())
}

View File

@@ -24,6 +24,8 @@ use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
use super::analytics::wait_for_analytics_event;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const REMOTE_PLUGIN_ID: &str = "plugins~Plugin_linear";
const WORKSPACE_REMOTE_PLUGIN_ID: &str = "plugins_69f27c3e67848191a45cbaa5f2adb39d";
@@ -116,37 +118,25 @@ async fn plugin_uninstall_tracks_analytics_event() -> Result<()> {
let response: PluginUninstallResponse = to_response(response)?;
assert_eq!(response, PluginUninstallResponse {});
let payload = timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = analytics_server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
if let Some(request) = requests.iter().find(|request| {
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
}) {
break request.body.clone();
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
let event = wait_for_analytics_event(
&analytics_server,
DEFAULT_TIMEOUT,
"codex_plugin_uninstalled",
)
.await?;
let payload: serde_json::Value = serde_json::from_slice(&payload).expect("analytics payload");
assert_eq!(
payload,
event,
json!({
"events": [{
"event_type": "codex_plugin_uninstalled",
"event_params": {
"plugin_id": "sample-plugin@debug",
"plugin_name": "sample-plugin",
"marketplace_name": "debug",
"has_skills": false,
"mcp_server_count": 0,
"connector_ids": [],
"product_client_id": DEFAULT_CLIENT_NAME,
}
}]
"event_type": "codex_plugin_uninstalled",
"event_params": {
"plugin_id": "sample-plugin@debug",
"plugin_name": "sample-plugin",
"marketplace_name": "debug",
"has_skills": false,
"mcp_server_count": 0,
"connector_ids": [],
"product_client_id": DEFAULT_CLIENT_NAME,
}
})
);
Ok(())

View File

@@ -17,6 +17,8 @@ use codex_config::types::AuthCredentialsStoreMode;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
@@ -150,8 +152,12 @@ impl BlockingRemoteControlBackend {
let (enroll_request_tx, enroll_request_rx) = oneshot::channel();
let server_task = tokio::spawn(async move {
match read_enroll_request(listener).await {
Ok((request_line, _reader)) => {
Ok(PendingHttpRequest {
request_line,
connection,
}) => {
let _ = enroll_request_tx.send(Ok(request_line));
let _connection = connection;
std::future::pending::<()>().await;
}
Err(err) => {
@@ -181,20 +187,48 @@ impl Drop for BlockingRemoteControlBackend {
}
}
async fn read_enroll_request(listener: TcpListener) -> Result<(String, BufReader<TcpStream>)> {
let (stream, _) = listener.accept().await?;
let mut reader = BufReader::new(stream);
let mut request_line = String::new();
reader.read_line(&mut request_line).await?;
loop {
let mut line = String::new();
reader.read_line(&mut line).await?;
if line == "\r\n" {
break;
}
}
Ok((request_line.trim_end().to_string(), reader))
struct PendingHttpRequest {
request_line: String,
connection: BufReader<TcpStream>,
}
async fn read_enroll_request(listener: TcpListener) -> Result<PendingHttpRequest> {
loop {
let (stream, _) = listener.accept().await?;
let mut reader = BufReader::new(stream);
let mut request_line = String::new();
reader.read_line(&mut request_line).await?;
let mut content_length = 0usize;
loop {
let mut line = String::new();
reader.read_line(&mut line).await?;
if let Some(value) = line.strip_prefix("Content-Length: ") {
content_length = value.trim().parse().unwrap_or_default();
}
if line == "\r\n" {
break;
}
}
if content_length > 0 {
let mut body = vec![0; content_length];
reader.read_exact(&mut body).await?;
}
let request_line = request_line.trim_end().to_string();
if request_line.contains("/codex/analytics-events/events ") {
reader
.get_mut()
.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
.await?;
continue;
}
return Ok(PendingHttpRequest {
request_line,
connection: reader,
});
}
}

View File

@@ -41,10 +41,10 @@ use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
use super::analytics::ANALYTICS_TEST_TIMEOUT;
use super::analytics::assert_basic_thread_initialized_event;
use super::analytics::mount_analytics_capture;
use super::analytics::thread_initialized_event;
use super::analytics::wait_for_analytics_payload;
use super::analytics::wait_for_analytics_event;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
@@ -403,9 +403,10 @@ async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> {
.await??;
let ThreadForkResponse { thread, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?;
let event = thread_initialized_event(&payload)?;
assert_basic_thread_initialized_event(event, &thread.id, "mock-model", "forked", "user");
let event =
wait_for_analytics_event(&server, ANALYTICS_TEST_TIMEOUT, "codex_thread_initialized")
.await?;
assert_basic_thread_initialized_event(&event, &thread.id, "mock-model", "forked", "user");
Ok(())
}

View File

@@ -92,10 +92,10 @@ use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
use super::analytics::ANALYTICS_TEST_TIMEOUT;
use super::analytics::assert_basic_thread_initialized_event;
use super::analytics::mount_analytics_capture;
use super::analytics::thread_initialized_event;
use super::analytics::wait_for_analytics_payload;
use super::analytics::wait_for_analytics_event;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
@@ -419,9 +419,10 @@ async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> {
);
assert_eq!(thread.thread_source, Some(ThreadSource::User));
let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?;
let event = thread_initialized_event(&payload)?;
assert_basic_thread_initialized_event(event, &thread.id, "gpt-5.3-codex", "resumed", "user");
let event =
wait_for_analytics_event(&server, ANALYTICS_TEST_TIMEOUT, "codex_thread_initialized")
.await?;
assert_basic_thread_initialized_event(&event, &thread.id, "gpt-5.3-codex", "resumed", "user");
assert_eq!(event["event_params"]["thread_source"], "user");
Ok(())
}

View File

@@ -44,10 +44,10 @@ use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
use super::analytics::ANALYTICS_TEST_TIMEOUT;
use super::analytics::assert_basic_thread_initialized_event;
use super::analytics::mount_analytics_capture;
use super::analytics::thread_initialized_event;
use super::analytics::wait_for_analytics_payload;
use super::analytics::wait_for_analytics_event;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
@@ -436,10 +436,18 @@ async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> {
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(resp)?;
let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?;
assert_eq!(payload["events"].as_array().expect("events array").len(), 1);
let event = thread_initialized_event(&payload)?;
assert_basic_thread_initialized_event(event, &thread.id, "mock-model", "new", "user");
let event =
wait_for_analytics_event(&server, ANALYTICS_TEST_TIMEOUT, "codex_thread_initialized")
.await?;
assert_basic_thread_initialized_event(&event, &thread.id, "mock-model", "new", "user");
let thread_start_timings = [
"thread_start_duration_ms",
"thread_start_prepare_duration_ms",
"thread_start_spawn_duration_ms",
"thread_start_finalize_duration_ms",
]
.map(|field| event["event_params"][field].as_u64().is_some());
assert_eq!(thread_start_timings, [true, true, true, true]);
Ok(())
}

View File

@@ -137,6 +137,7 @@ pub(crate) mod state_db_bridge;
pub use state_db_bridge::StateDbHandle;
pub use state_db_bridge::init_state_db;
mod thread_rollout_truncation;
mod thread_start_timing;
mod tools;
pub(crate) mod turn_diff_tracker;
mod turn_metadata;

View File

@@ -1735,7 +1735,12 @@ async fn try_run_sampling_request(
turn_context.model_info.slug.as_str(),
turn_context.provider.info().name.as_str(),
);
let mut stream = client_session
turn_context
.turn_timing_state
.mark_model_request_started()
.await;
turn_context.turn_timing_state.mark_sampling_started().await;
let stream_result = client_session
.stream(
prompt,
&turn_context.model_info,
@@ -1748,7 +1753,24 @@ async fn try_run_sampling_request(
)
.instrument(trace_span!("stream_request"))
.or_cancel(&cancellation_token)
.await??;
.await;
let mut stream = match stream_result {
Ok(Ok(stream)) => stream,
Ok(Err(err)) => {
turn_context
.turn_timing_state
.mark_sampling_completed()
.await;
return Err(err);
}
Err(codex_async_utils::CancelErr::Cancelled) => {
turn_context
.turn_timing_state
.mark_sampling_completed()
.await;
return Err(CodexErr::TurnAborted);
}
};
let mut in_flight: FuturesOrdered<BoxFuture<'static, CodexResult<ResponseInputItem>>> =
FuturesOrdered::new();
let mut needs_follow_up = false;
@@ -2153,6 +2175,10 @@ async fn try_run_sampling_request(
&mut assistant_message_stream_parsers,
)
.await;
turn_context
.turn_timing_state
.mark_sampling_completed()
.await;
if sess
.features
@@ -2163,7 +2189,17 @@ async fn try_run_sampling_request(
client_session.send_response_processed(response_id).await;
}
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
turn_context
.turn_timing_state
.mark_blocking_tool_critical_path_started()
.await;
let drain_in_flight_result =
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await;
turn_context
.turn_timing_state
.mark_blocking_tool_critical_path_completed()
.await;
drain_in_flight_result?;
if should_emit_token_count {
// A tool call such as request_user_input can intentionally pause the turn. Emit token

View File

@@ -6,8 +6,9 @@ mod user_shell;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use codex_analytics::TurnTimingBreakdownFact;
use codex_analytics::TurnTokenUsageFact;
use codex_extension_api::ExtensionData;
use futures::future::BoxFuture;
use tokio::select;
@@ -33,7 +34,6 @@ use crate::session::turn_context::TurnContext;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use codex_analytics::TurnTokenUsageFact;
use codex_login::AuthManager;
use codex_models_manager::manager::SharedModelsManager;
use codex_otel::SessionTelemetry;
@@ -318,11 +318,7 @@ impl Session {
let task: Arc<dyn AnySessionTask> = Arc::new(task);
let task_kind = task.kind();
let span_name = task.span_name();
let started_at = Instant::now();
let turn_started_at_unix_ms = turn_context
.turn_timing_state
.mark_turn_started(started_at)
.await;
let turn_started_at_unix_ms = turn_context.turn_timing_state.mark_turn_started().await;
turn_context
.turn_metadata_state
.set_turn_started_at_unix_ms(turn_started_at_unix_ms);
@@ -773,10 +769,21 @@ impl Session {
.turn_timing_state
.completed_at_and_duration_ms()
.await;
let turn_timing_breakdown = turn_context.turn_timing_state.timing_breakdown().await;
let time_to_first_token_ms = turn_context
.turn_timing_state
.time_to_first_token_ms()
.await;
self.services
.analytics_events_client
.track_turn_timing(TurnTimingBreakdownFact {
turn_id: turn_context.sub_id.clone(),
thread_id: self.conversation_id.to_string(),
request_start_delay_ms: turn_timing_breakdown.request_start_delay_ms,
sampling_duration_ms: turn_timing_breakdown.sampling_duration_ms,
blocking_tool_critical_path_duration_ms: turn_timing_breakdown
.blocking_tool_critical_path_duration_ms,
});
if should_clear_active_turn {
self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref())
.await;

View File

@@ -15,6 +15,7 @@ use crate::session::INITIAL_SUBMIT_ID;
use crate::shell_snapshot::ShellSnapshot;
use crate::tasks::InterruptedTurnHistoryMarker;
use crate::tasks::interrupted_turn_history_marker;
use crate::thread_start_timing::ThreadStartTiming;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::ThreadHistoryBuilder;
use codex_app_server_protocol::TurnStatus;
@@ -1188,6 +1189,7 @@ impl ThreadManagerState {
user_shell_override: Option<crate::shell::Shell>,
) -> CodexResult<NewThread> {
let is_resumed_thread = matches!(&initial_history, InitialHistory::Resumed(_));
let mut thread_start_timing = ThreadStartTiming::start();
if let InitialHistory::Resumed(resumed) = &initial_history {
let mut threads = self.threads.write().await;
if let Some(thread) = threads.get(&resumed.conversation_id).cloned() {
@@ -1200,6 +1202,12 @@ impl ThreadManagerState {
resumed.conversation_id
)));
}
thread_start_timing.mark_prepare_completed();
if let Some(analytics_events_client) = self.analytics_events_client.as_ref() {
analytics_events_client.track_thread_start_timing(
thread_start_timing.into_fact(resumed.conversation_id.to_string()),
);
}
return Ok(NewThread {
thread_id: resumed.conversation_id,
session_configured: thread.session_configured(),
@@ -1214,6 +1222,7 @@ impl ThreadManagerState {
let parent_rollout_thread_trace = self
.parent_rollout_thread_trace_for_source(&session_source, &initial_history)
.await;
thread_start_timing.mark_prepare_completed();
let tracked_session_source = session_source.clone();
let CodexSpawnOk {
codex, thread_id, ..
@@ -1245,9 +1254,16 @@ impl ThreadManagerState {
attestation_provider: self.attestation_provider.clone(),
})
.await?;
thread_start_timing.mark_spawn_completed();
let new_thread = self
.finalize_thread_spawn(codex, thread_id, tracked_session_source)
.await?;
thread_start_timing.mark_finalize_completed();
if let Some(analytics_events_client) = self.analytics_events_client.as_ref() {
analytics_events_client.track_thread_start_timing(
thread_start_timing.into_fact(new_thread.thread_id.to_string()),
);
}
if is_resumed_thread {
new_thread.thread.emit_thread_resume_lifecycle().await;
if let Err(err) = new_thread.thread.apply_goal_resume_runtime_effects().await {

View File

@@ -0,0 +1,58 @@
use std::time::Duration;
use std::time::Instant;
use codex_analytics::ThreadStartTimingFact;
#[derive(Debug)]
pub(crate) struct ThreadStartTiming {
phase_started_at: Instant,
prepare_duration: Option<Duration>,
spawn_duration: Option<Duration>,
finalize_duration: Option<Duration>,
}
impl ThreadStartTiming {
pub(crate) fn start() -> Self {
Self {
phase_started_at: Instant::now(),
prepare_duration: None,
spawn_duration: None,
finalize_duration: None,
}
}
pub(crate) fn mark_prepare_completed(&mut self) {
self.prepare_duration = Some(self.finish_phase());
}
pub(crate) fn mark_spawn_completed(&mut self) {
self.spawn_duration = Some(self.finish_phase());
}
pub(crate) fn mark_finalize_completed(&mut self) {
self.finalize_duration = Some(self.finish_phase());
}
pub(crate) fn into_fact(self, thread_id: String) -> ThreadStartTimingFact {
let prepare_duration = self.prepare_duration.unwrap_or_default();
let spawn_duration = self.spawn_duration.unwrap_or_default();
let finalize_duration = self.finalize_duration.unwrap_or_default();
ThreadStartTimingFact {
thread_id,
duration_ms: duration_ms(prepare_duration + spawn_duration + finalize_duration),
prepare_duration_ms: duration_ms(prepare_duration),
spawn_duration_ms: duration_ms(spawn_duration),
finalize_duration_ms: duration_ms(finalize_duration),
}
}
fn finish_phase(&mut self) -> Duration {
let duration = self.phase_started_at.elapsed();
self.phase_started_at = Instant::now();
duration
}
}
fn duration_ms(duration: Duration) -> u64 {
duration.as_millis().try_into().unwrap_or(u64::MAX)
}

View File

@@ -36,6 +36,13 @@ pub(crate) async fn record_turn_ttfm_metric(turn_context: &TurnContext, item: &T
.record_duration(TURN_TTFM_DURATION_METRIC, duration, &[]);
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub(crate) struct TurnTimingBreakdown {
pub(crate) request_start_delay_ms: Option<u64>,
pub(crate) sampling_duration_ms: u64,
pub(crate) blocking_tool_critical_path_duration_ms: u64,
}
#[derive(Debug, Default)]
pub(crate) struct TurnTimingState {
state: Mutex<TurnTimingStateInner>,
@@ -47,16 +54,27 @@ struct TurnTimingStateInner {
started_at_unix_secs: Option<i64>,
first_token_at: Option<Instant>,
first_message_at: Option<Instant>,
first_request_started_at: Option<Instant>,
sampling_started_at: Option<Instant>,
sampling_duration: Duration,
blocking_tool_critical_path_started_at: Option<Instant>,
blocking_tool_critical_path_duration: Duration,
}
impl TurnTimingState {
pub(crate) async fn mark_turn_started(&self, started_at: Instant) -> i64 {
pub(crate) async fn mark_turn_started(&self) -> i64 {
let started_at = Instant::now();
let started_at_unix_ms = now_unix_timestamp_ms();
let mut state = self.state.lock().await;
state.started_at = Some(started_at);
state.started_at_unix_secs = Some(started_at_unix_ms / 1000);
state.first_token_at = None;
state.first_message_at = None;
state.first_request_started_at = None;
state.sampling_started_at = None;
state.sampling_duration = Duration::default();
state.blocking_tool_critical_path_started_at = None;
state.blocking_tool_critical_path_duration = Duration::default();
started_at_unix_ms
}
@@ -98,6 +116,54 @@ impl TurnTimingState {
let mut state = self.state.lock().await;
state.record_turn_ttfm()
}
pub(crate) async fn mark_model_request_started(&self) {
let mut state = self.state.lock().await;
if state.first_request_started_at.is_none() {
state.first_request_started_at = Some(Instant::now());
}
}
pub(crate) async fn mark_sampling_started(&self) {
let mut state = self.state.lock().await;
if state.sampling_started_at.is_none() {
state.sampling_started_at = Some(Instant::now());
}
}
pub(crate) async fn mark_sampling_completed(&self) {
let mut state = self.state.lock().await;
if let Some(started_at) = state.sampling_started_at.take() {
state.sampling_duration = state.sampling_duration.saturating_add(started_at.elapsed());
}
}
pub(crate) async fn mark_blocking_tool_critical_path_started(&self) {
let mut state = self.state.lock().await;
if state.blocking_tool_critical_path_started_at.is_none() {
state.blocking_tool_critical_path_started_at = Some(Instant::now());
}
}
pub(crate) async fn mark_blocking_tool_critical_path_completed(&self) {
let mut state = self.state.lock().await;
if let Some(started_at) = state.blocking_tool_critical_path_started_at.take() {
state.blocking_tool_critical_path_duration = state
.blocking_tool_critical_path_duration
.saturating_add(started_at.elapsed());
}
}
pub(crate) async fn timing_breakdown(&self) -> TurnTimingBreakdown {
let state = self.state.lock().await;
TurnTimingBreakdown {
request_start_delay_ms: state.request_start_delay_ms(),
sampling_duration_ms: duration_to_u64_millis(state.sampling_duration),
blocking_tool_critical_path_duration_ms: duration_to_u64_millis(
state.blocking_tool_critical_path_duration,
),
}
}
}
fn now_unix_timestamp_secs() -> i64 {
@@ -112,6 +178,13 @@ pub(crate) fn now_unix_timestamp_ms() -> i64 {
}
impl TurnTimingStateInner {
fn request_start_delay_ms(&self) -> Option<u64> {
let duration = self
.first_request_started_at?
.duration_since(self.started_at?);
Some(duration_to_u64_millis(duration))
}
fn time_to_first_token(&self) -> Option<Duration> {
Some(self.first_token_at?.duration_since(self.started_at?))
}
@@ -136,6 +209,10 @@ impl TurnTimingStateInner {
}
}
fn duration_to_u64_millis(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}
fn response_event_records_turn_ttft(event: &ResponseEvent) -> bool {
match event {
ResponseEvent::OutputItemDone(item) | ResponseEvent::OutputItemAdded(item) => {

View File

@@ -4,9 +4,10 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq;
use std::time::Instant;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tokio::time::sleep;
use super::TurnTimingState;
use super::response_item_records_turn_ttft;
@@ -22,7 +23,7 @@ async fn turn_timing_state_records_ttft_only_once_per_turn() {
None
);
state.mark_turn_started(Instant::now()).await;
state.mark_turn_started().await;
assert_eq!(
state
.record_ttft_for_response_event(&ResponseEvent::Created)
@@ -46,7 +47,7 @@ async fn turn_timing_state_records_ttft_only_once_per_turn() {
#[tokio::test]
async fn turn_timing_state_records_ttfm_independently_of_ttft() {
let state = TurnTimingState::default();
state.mark_turn_started(Instant::now()).await;
state.mark_turn_started().await;
assert!(
state
@@ -86,7 +87,7 @@ async fn turn_timing_state_records_turn_started_epoch_millis() {
.expect("system time should be after unix epoch")
.as_millis();
let started_at_unix_ms = state.mark_turn_started(Instant::now()).await;
let started_at_unix_ms = state.mark_turn_started().await;
let after = SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -99,6 +100,28 @@ async fn turn_timing_state_records_turn_started_epoch_millis() {
);
}
#[tokio::test]
async fn turn_timing_state_tracks_request_start_and_duration_breakdown() {
let state = TurnTimingState::default();
state.mark_turn_started().await;
state.mark_model_request_started().await;
state.mark_sampling_started().await;
sleep(Duration::from_millis(10)).await;
state.mark_sampling_completed().await;
state.mark_sampling_started().await;
sleep(Duration::from_millis(5)).await;
state.mark_sampling_completed().await;
state.mark_blocking_tool_critical_path_started().await;
sleep(Duration::from_millis(5)).await;
state.mark_blocking_tool_critical_path_completed().await;
let breakdown = state.timing_breakdown().await;
assert!(breakdown.sampling_duration_ms >= 15);
assert!(breakdown.blocking_tool_critical_path_duration_ms >= 5);
assert!(breakdown.request_start_delay_ms.is_some());
}
#[test]
fn response_item_records_turn_ttft_for_first_output_signals() {
assert!(response_item_records_turn_ttft(