mirror of
https://github.com/openai/codex.git
synced 2026-06-02 19:31:59 +00:00
Compare commits
10 Commits
fcoury/tok
...
aibrahim/t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
38e0a59814 | ||
|
|
688f52aac9 | ||
|
|
c7b6f807f1 | ||
|
|
3f40c4eabf | ||
|
|
5f31aa4469 | ||
|
|
2425201406 | ||
|
|
ba193ee95b | ||
|
|
f71b306e3f | ||
|
|
f29c0be3ff | ||
|
|
2adcc3e32e |
@@ -1,5 +1,6 @@
|
||||
use crate::client::AnalyticsEventsQueue;
|
||||
use crate::events::AppServerRpcTransport;
|
||||
use crate::events::AppServerStartedEventRequest;
|
||||
use crate::events::CodexAcceptedLineFingerprintsEventParams;
|
||||
use crate::events::CodexAcceptedLineFingerprintsEventRequest;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
@@ -15,6 +16,7 @@ use crate::events::CodexReviewEventParams;
|
||||
use crate::events::CodexReviewEventRequest;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
use crate::events::CodexToolItemEventBase;
|
||||
use crate::events::CodexTurnEventParams;
|
||||
use crate::events::CodexTurnEventRequest;
|
||||
use crate::events::FinalApprovalOutcome;
|
||||
use crate::events::GuardianApprovalRequestSource;
|
||||
@@ -41,6 +43,7 @@ use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AnalyticsJsonRpcError;
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppServerStartedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CodexCompactionEvent;
|
||||
use crate::facts::CompactionImplementation;
|
||||
@@ -61,10 +64,12 @@ use crate::facts::SkillInvocation;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
use crate::facts::ThreadInitializationMode;
|
||||
use crate::facts::ThreadInitializationTimingFact;
|
||||
use crate::facts::TrackEventsContext;
|
||||
use crate::facts::TurnResolvedConfigFact;
|
||||
use crate::facts::TurnStatus;
|
||||
use crate::facts::TurnSteerRequestError;
|
||||
use crate::facts::TurnTimingFact;
|
||||
use crate::facts::TurnTokenUsageFact;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use crate::reducer::normalize_path_for_skill_id;
|
||||
@@ -331,6 +336,16 @@ fn sample_turn_token_usage_fact(thread_id: &str, turn_id: &str) -> TurnTokenUsag
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_turn_timing_fact(thread_id: &str, turn_id: &str) -> TurnTimingFact {
|
||||
TurnTimingFact {
|
||||
turn_id: turn_id.to_string(),
|
||||
thread_id: thread_id.to_string(),
|
||||
request_start_delay_ms: Some(120),
|
||||
sampling_duration_ms: 900,
|
||||
blocking_tool_critical_path_duration_ms: 140,
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_turn_completed_notification(
|
||||
thread_id: &str,
|
||||
turn_id: &str,
|
||||
@@ -1367,6 +1382,37 @@ fn thread_initialized_event_serializes_expected_shape() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn app_server_started_event_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::AppServerStarted(AppServerStartedEventRequest {
|
||||
event_type: "app_server_started",
|
||||
event_params: AppServerStartedInput {
|
||||
runtime: sample_runtime_metadata(),
|
||||
remote_control_enabled: true,
|
||||
duration_ms: 987,
|
||||
},
|
||||
});
|
||||
|
||||
let payload = serde_json::to_value(&event).expect("serialize app-server started event");
|
||||
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!({
|
||||
"event_type": "app_server_started",
|
||||
"event_params": {
|
||||
"runtime": {
|
||||
"codex_rs_version": "0.1.0",
|
||||
"runtime_os": "macos",
|
||||
"runtime_os_version": "15.3.1",
|
||||
"runtime_arch": "aarch64"
|
||||
},
|
||||
"remote_control_enabled": true,
|
||||
"duration_ms": 987
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn command_execution_event_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::CommandExecution(CodexCommandExecutionEventRequest {
|
||||
@@ -1645,6 +1691,43 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn app_server_started_fact_emits_event() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::AppServerStarted(
|
||||
AppServerStartedInput {
|
||||
runtime: sample_runtime_metadata(),
|
||||
remote_control_enabled: true,
|
||||
duration_ms: 456,
|
||||
},
|
||||
)),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!([{
|
||||
"event_type": "app_server_started",
|
||||
"event_params": {
|
||||
"runtime": {
|
||||
"codex_rs_version": "0.1.0",
|
||||
"runtime_os": "macos",
|
||||
"runtime_os_version": "15.3.1",
|
||||
"runtime_arch": "aarch64"
|
||||
},
|
||||
"remote_control_enabled": true,
|
||||
"duration_ms": 456
|
||||
}
|
||||
}])
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unrelated_client_requests_are_ignored_by_reducer() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
@@ -2554,6 +2637,44 @@ fn subagent_thread_started_other_serializes_explicit_parent_thread_id() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_initialization_timing_fact_emits_event() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::ThreadInitializationTiming(
|
||||
ThreadInitializationTimingFact {
|
||||
thread_id: "thread-review".to_string(),
|
||||
initialization_mode: ThreadInitializationMode::Forked,
|
||||
duration_ms: 222,
|
||||
prepare_duration_ms: 12,
|
||||
spawn_duration_ms: 123,
|
||||
finalize_duration_ms: 87,
|
||||
},
|
||||
)),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!([{
|
||||
"event_type": "thread_initialization_timing",
|
||||
"event_params": {
|
||||
"thread_id": "thread-review",
|
||||
"initialization_mode": "forked",
|
||||
"duration_ms": 222,
|
||||
"prepare_duration_ms": 12,
|
||||
"spawn_duration_ms": 123,
|
||||
"finalize_duration_ms": 87,
|
||||
}
|
||||
}])
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subagent_thread_started_publishes_without_initialize() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
@@ -3211,7 +3332,7 @@ async fn reducer_ingests_plugin_state_changed_fact() {
|
||||
fn turn_event_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::TurnEvent(Box::new(CodexTurnEventRequest {
|
||||
event_type: "codex_turn_event",
|
||||
event_params: crate::events::CodexTurnEventParams {
|
||||
event_params: CodexTurnEventParams {
|
||||
thread_id: "thread-2".to_string(),
|
||||
session_id: "session-thread-2".to_string(),
|
||||
turn_id: "turn-2".to_string(),
|
||||
@@ -3640,6 +3761,36 @@ async fn turn_lifecycle_emits_turn_event() {
|
||||
assert_eq!(payload["event_params"]["total_tokens"], json!(321));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_timing_fact_emits_dedicated_event() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut out = Vec::new();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::TurnTiming(Box::new(
|
||||
sample_turn_timing_fact("thread-2", "turn-2"),
|
||||
))),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&out).expect("serialize timing event");
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!([{
|
||||
"event_type": "turn_timing",
|
||||
"event_params": {
|
||||
"thread_id": "thread-2",
|
||||
"turn_id": "turn-2",
|
||||
"request_start_delay_ms": 120,
|
||||
"sampling_duration_ms": 900,
|
||||
"blocking_tool_critical_path_duration_ms": 140,
|
||||
},
|
||||
}])
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_event_counts_completed_tool_items() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AnalyticsJsonRpcError;
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppServerStartedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
use crate::facts::HookRunFact;
|
||||
@@ -17,8 +18,10 @@ use crate::facts::PluginStateChangedInput;
|
||||
use crate::facts::SkillInvocation;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
use crate::facts::ThreadInitializationTimingFact;
|
||||
use crate::facts::TrackEventsContext;
|
||||
use crate::facts::TurnResolvedConfigFact;
|
||||
use crate::facts::TurnTimingFact;
|
||||
use crate::facts::TurnTokenUsageFact;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
@@ -163,12 +166,34 @@ impl AnalyticsEventsClient {
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_app_server_started(&self, duration_ms: u64, remote_control_enabled: bool) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::AppServerStarted(AppServerStartedInput {
|
||||
runtime: current_runtime_metadata(),
|
||||
remote_control_enabled,
|
||||
duration_ms,
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_subagent_thread_started(&self, input: SubAgentThreadStartedInput) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::SubAgentThreadStarted(input),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_thread_initialization_timing(&self, fact: ThreadInitializationTimingFact) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::ThreadInitializationTiming(fact),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_turn_timing(&self, fact: TurnTimingFact) {
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::TurnTiming(
|
||||
Box::new(fact),
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_guardian_review(
|
||||
&self,
|
||||
tracking: &GuardianReviewTrackContext,
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::time::Instant;
|
||||
|
||||
use crate::facts::AcceptedLineFingerprint;
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::AppServerStartedInput;
|
||||
use crate::facts::CodexCompactionEvent;
|
||||
use crate::facts::CompactionImplementation;
|
||||
use crate::facts::CompactionPhase;
|
||||
@@ -14,11 +15,13 @@ use crate::facts::InvocationType;
|
||||
use crate::facts::PluginState;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
use crate::facts::ThreadInitializationMode;
|
||||
use crate::facts::ThreadInitializationTimingFact;
|
||||
use crate::facts::TrackEventsContext;
|
||||
use crate::facts::TurnStatus;
|
||||
use crate::facts::TurnSteerRejectionReason;
|
||||
use crate::facts::TurnSteerResult;
|
||||
use crate::facts::TurnSubmissionType;
|
||||
use crate::facts::TurnTimingFact;
|
||||
use crate::now_unix_millis;
|
||||
use codex_app_server_protocol::CodexErrorInfo;
|
||||
use codex_app_server_protocol::CommandExecutionSource;
|
||||
@@ -56,13 +59,16 @@ pub(crate) struct TrackEventsRequest {
|
||||
#[serde(untagged)]
|
||||
pub(crate) enum TrackEventRequest {
|
||||
SkillInvocation(SkillInvocationEventRequest),
|
||||
AppServerStarted(AppServerStartedEventRequest),
|
||||
ThreadInitialized(ThreadInitializedEvent),
|
||||
ThreadInitializationTiming(ThreadInitializationTimingEventRequest),
|
||||
GuardianReview(Box<GuardianReviewEventRequest>),
|
||||
AppMentioned(CodexAppMentionedEventRequest),
|
||||
AppUsed(CodexAppUsedEventRequest),
|
||||
HookRun(CodexHookRunEventRequest),
|
||||
Compaction(Box<CodexCompactionEventRequest>),
|
||||
TurnEvent(Box<CodexTurnEventRequest>),
|
||||
TurnTiming(TurnTimingEventRequest),
|
||||
TurnSteer(CodexTurnSteerEventRequest),
|
||||
CommandExecution(CodexCommandExecutionEventRequest),
|
||||
FileChange(CodexFileChangeEventRequest),
|
||||
@@ -144,6 +150,12 @@ pub(crate) struct CodexRuntimeMetadata {
|
||||
pub(crate) runtime_arch: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct AppServerStartedEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: AppServerStartedInput,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct ThreadInitializedEventParams {
|
||||
pub(crate) thread_id: String,
|
||||
@@ -165,6 +177,12 @@ pub(crate) struct ThreadInitializedEvent {
|
||||
pub(crate) event_params: ThreadInitializedEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct ThreadInitializationTimingEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: ThreadInitializationTimingFact,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct GuardianReviewEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
@@ -822,6 +840,12 @@ pub(crate) struct CodexTurnEventRequest {
|
||||
pub(crate) event_params: CodexTurnEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct TurnTimingEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: TurnTimingFact,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexTurnSteerEventParams {
|
||||
pub(crate) thread_id: String,
|
||||
|
||||
@@ -88,6 +88,7 @@ pub struct TurnResolvedConfigFact {
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ThreadInitializationMode {
|
||||
New,
|
||||
Cleared,
|
||||
Forked,
|
||||
Resumed,
|
||||
}
|
||||
@@ -99,6 +100,15 @@ pub struct TurnTokenUsageFact {
|
||||
pub token_usage: TokenUsage,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize)]
|
||||
pub struct TurnTimingFact {
|
||||
pub turn_id: String,
|
||||
pub thread_id: String,
|
||||
pub request_start_delay_ms: Option<u64>,
|
||||
pub sampling_duration_ms: u64,
|
||||
pub blocking_tool_critical_path_duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TurnStatus {
|
||||
@@ -275,6 +285,23 @@ pub struct CodexCompactionEvent {
|
||||
pub duration_ms: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct AppServerStartedInput {
|
||||
pub runtime: CodexRuntimeMetadata,
|
||||
pub remote_control_enabled: bool,
|
||||
pub duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct ThreadInitializationTimingFact {
|
||||
pub thread_id: String,
|
||||
pub initialization_mode: ThreadInitializationMode,
|
||||
pub duration_ms: u64,
|
||||
pub prepare_duration_ms: u64,
|
||||
pub spawn_duration_ms: u64,
|
||||
pub finalize_duration_ms: u64,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) enum AnalyticsFact {
|
||||
Initialize {
|
||||
@@ -324,10 +351,13 @@ pub(crate) enum AnalyticsFact {
|
||||
}
|
||||
|
||||
pub(crate) enum CustomAnalyticsFact {
|
||||
AppServerStarted(AppServerStartedInput),
|
||||
SubAgentThreadStarted(SubAgentThreadStartedInput),
|
||||
ThreadInitializationTiming(ThreadInitializationTimingFact),
|
||||
Compaction(Box<CodexCompactionEvent>),
|
||||
GuardianReview(Box<GuardianReviewEventParams>),
|
||||
TurnResolvedConfig(Box<TurnResolvedConfigFact>),
|
||||
TurnTiming(Box<TurnTimingFact>),
|
||||
TurnTokenUsage(Box<TurnTokenUsageFact>),
|
||||
SkillInvoked(SkillInvokedInput),
|
||||
AppMentioned(AppMentionedInput),
|
||||
|
||||
@@ -37,12 +37,14 @@ pub use facts::InvocationType;
|
||||
pub use facts::SkillInvocation;
|
||||
pub use facts::SubAgentThreadStartedInput;
|
||||
pub use facts::ThreadInitializationMode;
|
||||
pub use facts::ThreadInitializationTimingFact;
|
||||
pub use facts::TrackEventsContext;
|
||||
pub use facts::TurnResolvedConfigFact;
|
||||
pub use facts::TurnStatus;
|
||||
pub use facts::TurnSteerRejectionReason;
|
||||
pub use facts::TurnSteerRequestError;
|
||||
pub use facts::TurnSteerResult;
|
||||
pub use facts::TurnTimingFact;
|
||||
pub use facts::TurnTokenUsageFact;
|
||||
pub use facts::build_track_events_context;
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::accepted_lines::accepted_line_fingerprint_event_requests;
|
||||
use crate::accepted_lines::accepted_line_fingerprints_from_unified_diff;
|
||||
use crate::accepted_lines::accepted_line_repo_hash_for_cwd;
|
||||
use crate::events::AppServerRpcTransport;
|
||||
use crate::events::AppServerStartedEventRequest;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
use crate::events::CodexAppServerClientMetadata;
|
||||
use crate::events::CodexAppUsedEventRequest;
|
||||
@@ -43,11 +44,13 @@ use crate::events::ReviewTrigger;
|
||||
use crate::events::Reviewer;
|
||||
use crate::events::SkillInvocationEventParams;
|
||||
use crate::events::SkillInvocationEventRequest;
|
||||
use crate::events::ThreadInitializationTimingEventRequest;
|
||||
use crate::events::ThreadInitializedEvent;
|
||||
use crate::events::ThreadInitializedEventParams;
|
||||
use crate::events::ToolItemFailureKind;
|
||||
use crate::events::ToolItemTerminalStatus;
|
||||
use crate::events::TrackEventRequest;
|
||||
use crate::events::TurnTimingEventRequest;
|
||||
use crate::events::WebSearchActionKind;
|
||||
use crate::events::codex_app_metadata;
|
||||
use crate::events::codex_compaction_event_params;
|
||||
@@ -61,6 +64,7 @@ use crate::events::subagent_thread_started_event_request;
|
||||
use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AnalyticsJsonRpcError;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppServerStartedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CodexCompactionEvent;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
@@ -71,10 +75,12 @@ use crate::facts::PluginUsedInput;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
use crate::facts::ThreadInitializationMode;
|
||||
use crate::facts::ThreadInitializationTimingFact;
|
||||
use crate::facts::TurnResolvedConfigFact;
|
||||
use crate::facts::TurnStatus;
|
||||
use crate::facts::TurnSteerRejectionReason;
|
||||
use crate::facts::TurnSteerResult;
|
||||
use crate::facts::TurnTimingFact;
|
||||
use crate::facts::TurnTokenUsageFact;
|
||||
use crate::now_unix_seconds;
|
||||
use crate::option_i64_to_u64;
|
||||
@@ -449,9 +455,15 @@ impl AnalyticsReducer {
|
||||
self.ingest_server_request_aborted(completed_at_ms, request_id, out);
|
||||
}
|
||||
AnalyticsFact::Custom(input) => match input {
|
||||
CustomAnalyticsFact::AppServerStarted(input) => {
|
||||
self.ingest_app_server_started(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
|
||||
self.ingest_subagent_thread_started(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::ThreadInitializationTiming(input) => {
|
||||
Self::ingest_thread_initialization_timing(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::Compaction(input) => {
|
||||
self.ingest_compaction(*input, out);
|
||||
}
|
||||
@@ -461,6 +473,9 @@ impl AnalyticsReducer {
|
||||
CustomAnalyticsFact::TurnResolvedConfig(input) => {
|
||||
self.ingest_turn_resolved_config(*input, out).await;
|
||||
}
|
||||
CustomAnalyticsFact::TurnTiming(input) => {
|
||||
Self::ingest_turn_timing(*input, out);
|
||||
}
|
||||
CustomAnalyticsFact::TurnTokenUsage(input) => {
|
||||
self.ingest_turn_token_usage(*input, out).await;
|
||||
}
|
||||
@@ -511,6 +526,19 @@ impl AnalyticsReducer {
|
||||
);
|
||||
}
|
||||
|
||||
fn ingest_app_server_started(
|
||||
&mut self,
|
||||
input: AppServerStartedInput,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
out.push(TrackEventRequest::AppServerStarted(
|
||||
AppServerStartedEventRequest {
|
||||
event_type: "app_server_started",
|
||||
event_params: input,
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
fn ingest_subagent_thread_started(
|
||||
&mut self,
|
||||
input: SubAgentThreadStartedInput,
|
||||
@@ -542,6 +570,18 @@ impl AnalyticsReducer {
|
||||
));
|
||||
}
|
||||
|
||||
fn ingest_thread_initialization_timing(
|
||||
input: ThreadInitializationTimingFact,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
out.push(TrackEventRequest::ThreadInitializationTiming(
|
||||
ThreadInitializationTimingEventRequest {
|
||||
event_type: "thread_initialization_timing",
|
||||
event_params: input,
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
fn ingest_guardian_review(
|
||||
&mut self,
|
||||
input: GuardianReviewEventParams,
|
||||
@@ -645,6 +685,13 @@ impl AnalyticsReducer {
|
||||
self.maybe_emit_turn_event(&turn_id, out).await;
|
||||
}
|
||||
|
||||
fn ingest_turn_timing(input: TurnTimingFact, out: &mut Vec<TrackEventRequest>) {
|
||||
out.push(TrackEventRequest::TurnTiming(TurnTimingEventRequest {
|
||||
event_type: "turn_timing",
|
||||
event_params: input,
|
||||
}));
|
||||
}
|
||||
|
||||
async fn ingest_skill_invoked(
|
||||
&mut self,
|
||||
input: SkillInvokedInput,
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::io::Result as IoResult;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::analytics_utils::analytics_events_client_from_config;
|
||||
use crate::config_manager::ConfigManager;
|
||||
@@ -428,6 +429,7 @@ pub async fn run_main_with_transport_options(
|
||||
auth: AppServerWebsocketAuthSettings,
|
||||
runtime_options: AppServerRuntimeOptions,
|
||||
) -> IoResult<()> {
|
||||
let app_server_started_at = Instant::now();
|
||||
let (transport_event_tx, mut transport_event_rx) =
|
||||
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
|
||||
@@ -789,6 +791,12 @@ pub async fn run_main_with_transport_options(
|
||||
let auth_manager = Arc::clone(&auth_manager);
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
|
||||
let duration_ms = app_server_started_at
|
||||
.elapsed()
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.unwrap_or(u64::MAX);
|
||||
analytics_events_client.track_app_server_started(duration_ms, remote_control_enabled);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
|
||||
@@ -21,6 +21,12 @@ use wiremock::matchers::path;
|
||||
|
||||
const SERVICE_VERSION: &str = "0.0.0-test";
|
||||
|
||||
#[cfg(any(target_os = "macos", windows))]
|
||||
pub(crate) const ANALYTICS_TEST_TIMEOUT: Duration = Duration::from_secs(25);
|
||||
|
||||
#[cfg(not(any(target_os = "macos", windows)))]
|
||||
pub(crate) const ANALYTICS_TEST_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
fn set_metrics_exporter(config: &mut codex_core::config::Config) {
|
||||
config.otel.metrics_exporter = OtelExporterKind::OtlpHttp {
|
||||
endpoint: "http://localhost:4318".to_string(),
|
||||
@@ -98,28 +104,6 @@ pub(crate) async fn mount_analytics_capture(server: &MockServer, codex_home: &Pa
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_analytics_payload(
|
||||
server: &MockServer,
|
||||
read_timeout: Duration,
|
||||
) -> Result<Value> {
|
||||
let body = timeout(read_timeout, async {
|
||||
loop {
|
||||
let Some(requests) = server.received_requests().await else {
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
continue;
|
||||
};
|
||||
if let Some(request) = requests.iter().find(|request| {
|
||||
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
|
||||
}) {
|
||||
break request.body.clone();
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
serde_json::from_slice(&body).map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"))
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_analytics_event(
|
||||
server: &MockServer,
|
||||
read_timeout: Duration,
|
||||
@@ -133,7 +117,10 @@ pub(crate) async fn wait_for_analytics_event(
|
||||
};
|
||||
for request in &requests {
|
||||
if request.method != "POST"
|
||||
|| request.url.path() != "/codex/analytics-events/events"
|
||||
|| !request
|
||||
.url
|
||||
.path()
|
||||
.ends_with("/codex/analytics-events/events")
|
||||
{
|
||||
continue;
|
||||
}
|
||||
@@ -155,16 +142,6 @@ pub(crate) async fn wait_for_analytics_event(
|
||||
.await?
|
||||
}
|
||||
|
||||
pub(crate) fn thread_initialized_event(payload: &Value) -> Result<&Value> {
|
||||
let events = payload["events"]
|
||||
.as_array()
|
||||
.ok_or_else(|| anyhow::anyhow!("analytics payload missing events array"))?;
|
||||
events
|
||||
.iter()
|
||||
.find(|event| event["event_type"] == "codex_thread_initialized")
|
||||
.ok_or_else(|| anyhow::anyhow!("codex_thread_initialized event should be present"))
|
||||
}
|
||||
|
||||
pub(crate) fn assert_basic_thread_initialized_event(
|
||||
event: &Value,
|
||||
thread_id: &str,
|
||||
@@ -207,3 +184,29 @@ pub(crate) fn assert_basic_thread_initialized_event(
|
||||
);
|
||||
assert!(event["event_params"]["created_at"].as_u64().is_some());
|
||||
}
|
||||
|
||||
pub(crate) fn assert_thread_initialization_timing_event(
|
||||
event: &Value,
|
||||
thread_id: &str,
|
||||
initialization_mode: &str,
|
||||
) {
|
||||
let durations_are_numbers = [
|
||||
"duration_ms",
|
||||
"prepare_duration_ms",
|
||||
"spawn_duration_ms",
|
||||
"finalize_duration_ms",
|
||||
]
|
||||
.map(|field| event["event_params"][field].as_u64().is_some());
|
||||
assert_eq!(
|
||||
serde_json::json!({
|
||||
"thread_id": event["event_params"]["thread_id"],
|
||||
"initialization_mode": event["event_params"]["initialization_mode"],
|
||||
"durations_are_numbers": durations_are_numbers,
|
||||
}),
|
||||
serde_json::json!({
|
||||
"thread_id": thread_id,
|
||||
"initialization_mode": initialization_mode,
|
||||
"durations_are_numbers": [true, true, true, true],
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@@ -58,6 +58,8 @@ use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
use wiremock::matchers::query_param;
|
||||
|
||||
use super::analytics::wait_for_analytics_event;
|
||||
|
||||
// Plugin install tests wait on connector discovery after the install response path
|
||||
// starts, which is noticeably slower on Windows CI.
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
@@ -727,22 +729,22 @@ async fn plugin_install_tracks_analytics_event() -> Result<()> {
|
||||
let response: PluginInstallResponse = to_response(response)?;
|
||||
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
|
||||
|
||||
let payload = wait_for_plugin_analytics_payload(&analytics_server).await?;
|
||||
let event =
|
||||
wait_for_analytics_event(&analytics_server, DEFAULT_TIMEOUT, "codex_plugin_installed")
|
||||
.await?;
|
||||
assert_eq!(
|
||||
payload,
|
||||
event,
|
||||
json!({
|
||||
"events": [{
|
||||
"event_type": "codex_plugin_installed",
|
||||
"event_params": {
|
||||
"plugin_id": "sample-plugin@debug",
|
||||
"plugin_name": "sample-plugin",
|
||||
"marketplace_name": "debug",
|
||||
"has_skills": false,
|
||||
"mcp_server_count": 0,
|
||||
"connector_ids": [],
|
||||
"product_client_id": DEFAULT_CLIENT_NAME,
|
||||
}
|
||||
}]
|
||||
"event_type": "codex_plugin_installed",
|
||||
"event_params": {
|
||||
"plugin_id": "sample-plugin@debug",
|
||||
"plugin_name": "sample-plugin",
|
||||
"marketplace_name": "debug",
|
||||
"has_skills": false,
|
||||
"mcp_server_count": 0,
|
||||
"connector_ids": [],
|
||||
"product_client_id": DEFAULT_CLIENT_NAME,
|
||||
}
|
||||
})
|
||||
);
|
||||
Ok(())
|
||||
@@ -780,22 +782,21 @@ async fn plugin_install_tracks_remote_plugin_analytics_event() -> Result<()> {
|
||||
let response: PluginInstallResponse = to_response(response)?;
|
||||
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
|
||||
|
||||
let payload = wait_for_plugin_analytics_payload(&server).await?;
|
||||
let event =
|
||||
wait_for_analytics_event(&server, DEFAULT_TIMEOUT, "codex_plugin_installed").await?;
|
||||
assert_eq!(
|
||||
payload,
|
||||
event,
|
||||
json!({
|
||||
"events": [{
|
||||
"event_type": "codex_plugin_installed",
|
||||
"event_params": {
|
||||
"plugin_id": REMOTE_PLUGIN_ID,
|
||||
"plugin_name": "linear",
|
||||
"marketplace_name": "openai-curated-remote",
|
||||
"has_skills": true,
|
||||
"mcp_server_count": 0,
|
||||
"connector_ids": [],
|
||||
"product_client_id": DEFAULT_CLIENT_NAME,
|
||||
}
|
||||
}]
|
||||
"event_type": "codex_plugin_installed",
|
||||
"event_params": {
|
||||
"plugin_id": REMOTE_PLUGIN_ID,
|
||||
"plugin_name": "linear",
|
||||
"marketplace_name": "openai-curated-remote",
|
||||
"has_skills": true,
|
||||
"mcp_server_count": 0,
|
||||
"connector_ids": [],
|
||||
"product_client_id": DEFAULT_CLIENT_NAME,
|
||||
}
|
||||
})
|
||||
);
|
||||
Ok(())
|
||||
@@ -1288,29 +1289,6 @@ async fn mount_backend_analytics_events(server: &MockServer) {
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn wait_for_plugin_analytics_payload(server: &MockServer) -> Result<serde_json::Value> {
|
||||
timeout(DEFAULT_TIMEOUT, async {
|
||||
loop {
|
||||
let Some(requests) = server.received_requests().await else {
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
continue;
|
||||
};
|
||||
if let Some(request) = requests.iter().find(|request| {
|
||||
request.method == "POST"
|
||||
&& request
|
||||
.url
|
||||
.path()
|
||||
.ends_with("/codex/analytics-events/events")
|
||||
}) {
|
||||
return serde_json::from_slice(&request.body)
|
||||
.map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"));
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
fn write_remote_plugin_catalog_config(
|
||||
codex_home: &std::path::Path,
|
||||
base_url: &str,
|
||||
|
||||
@@ -383,7 +383,13 @@ plugin_sharing = false
|
||||
.received_requests()
|
||||
.await
|
||||
.expect("wiremock should record requests")
|
||||
.is_empty()
|
||||
.iter()
|
||||
.all(|request| {
|
||||
request
|
||||
.url
|
||||
.path()
|
||||
.ends_with("/codex/analytics-events/events")
|
||||
})
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -24,6 +24,8 @@ use wiremock::matchers::header;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
use super::analytics::wait_for_analytics_event;
|
||||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const REMOTE_PLUGIN_ID: &str = "plugins~Plugin_linear";
|
||||
const WORKSPACE_REMOTE_PLUGIN_ID: &str = "plugins_69f27c3e67848191a45cbaa5f2adb39d";
|
||||
@@ -116,37 +118,25 @@ async fn plugin_uninstall_tracks_analytics_event() -> Result<()> {
|
||||
let response: PluginUninstallResponse = to_response(response)?;
|
||||
assert_eq!(response, PluginUninstallResponse {});
|
||||
|
||||
let payload = timeout(DEFAULT_TIMEOUT, async {
|
||||
loop {
|
||||
let Some(requests) = analytics_server.received_requests().await else {
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
continue;
|
||||
};
|
||||
if let Some(request) = requests.iter().find(|request| {
|
||||
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
|
||||
}) {
|
||||
break request.body.clone();
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
})
|
||||
let event = wait_for_analytics_event(
|
||||
&analytics_server,
|
||||
DEFAULT_TIMEOUT,
|
||||
"codex_plugin_uninstalled",
|
||||
)
|
||||
.await?;
|
||||
let payload: serde_json::Value = serde_json::from_slice(&payload).expect("analytics payload");
|
||||
assert_eq!(
|
||||
payload,
|
||||
event,
|
||||
json!({
|
||||
"events": [{
|
||||
"event_type": "codex_plugin_uninstalled",
|
||||
"event_params": {
|
||||
"plugin_id": "sample-plugin@debug",
|
||||
"plugin_name": "sample-plugin",
|
||||
"marketplace_name": "debug",
|
||||
"has_skills": false,
|
||||
"mcp_server_count": 0,
|
||||
"connector_ids": [],
|
||||
"product_client_id": DEFAULT_CLIENT_NAME,
|
||||
}
|
||||
}]
|
||||
"event_type": "codex_plugin_uninstalled",
|
||||
"event_params": {
|
||||
"plugin_id": "sample-plugin@debug",
|
||||
"plugin_name": "sample-plugin",
|
||||
"marketplace_name": "debug",
|
||||
"has_skills": false,
|
||||
"mcp_server_count": 0,
|
||||
"connector_ids": [],
|
||||
"product_client_id": DEFAULT_CLIENT_NAME,
|
||||
}
|
||||
})
|
||||
);
|
||||
Ok(())
|
||||
|
||||
@@ -17,6 +17,8 @@ use codex_config::types::AuthCredentialsStoreMode;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::TcpStream;
|
||||
@@ -150,8 +152,12 @@ impl BlockingRemoteControlBackend {
|
||||
let (enroll_request_tx, enroll_request_rx) = oneshot::channel();
|
||||
let server_task = tokio::spawn(async move {
|
||||
match read_enroll_request(listener).await {
|
||||
Ok((request_line, _reader)) => {
|
||||
Ok(PendingHttpRequest {
|
||||
request_line,
|
||||
connection,
|
||||
}) => {
|
||||
let _ = enroll_request_tx.send(Ok(request_line));
|
||||
let _connection = connection;
|
||||
std::future::pending::<()>().await;
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -181,20 +187,48 @@ impl Drop for BlockingRemoteControlBackend {
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_enroll_request(listener: TcpListener) -> Result<(String, BufReader<TcpStream>)> {
|
||||
let (stream, _) = listener.accept().await?;
|
||||
let mut reader = BufReader::new(stream);
|
||||
|
||||
let mut request_line = String::new();
|
||||
reader.read_line(&mut request_line).await?;
|
||||
|
||||
loop {
|
||||
let mut line = String::new();
|
||||
reader.read_line(&mut line).await?;
|
||||
if line == "\r\n" {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok((request_line.trim_end().to_string(), reader))
|
||||
struct PendingHttpRequest {
|
||||
request_line: String,
|
||||
connection: BufReader<TcpStream>,
|
||||
}
|
||||
|
||||
async fn read_enroll_request(listener: TcpListener) -> Result<PendingHttpRequest> {
|
||||
loop {
|
||||
let (stream, _) = listener.accept().await?;
|
||||
let mut reader = BufReader::new(stream);
|
||||
|
||||
let mut request_line = String::new();
|
||||
reader.read_line(&mut request_line).await?;
|
||||
|
||||
let mut content_length = 0usize;
|
||||
loop {
|
||||
let mut line = String::new();
|
||||
reader.read_line(&mut line).await?;
|
||||
if let Some(value) = line.strip_prefix("Content-Length: ") {
|
||||
content_length = value.trim().parse().unwrap_or_default();
|
||||
}
|
||||
if line == "\r\n" {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if content_length > 0 {
|
||||
let mut body = vec![0; content_length];
|
||||
reader.read_exact(&mut body).await?;
|
||||
}
|
||||
|
||||
let request_line = request_line.trim_end().to_string();
|
||||
if request_line.contains("/codex/analytics-events/events ") {
|
||||
reader
|
||||
.get_mut()
|
||||
.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
|
||||
.await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
return Ok(PendingHttpRequest {
|
||||
request_line,
|
||||
connection: reader,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,10 +41,11 @@ use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
use super::analytics::ANALYTICS_TEST_TIMEOUT;
|
||||
use super::analytics::assert_basic_thread_initialized_event;
|
||||
use super::analytics::assert_thread_initialization_timing_event;
|
||||
use super::analytics::mount_analytics_capture;
|
||||
use super::analytics::thread_initialized_event;
|
||||
use super::analytics::wait_for_analytics_payload;
|
||||
use super::analytics::wait_for_analytics_event;
|
||||
|
||||
#[cfg(windows)]
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
|
||||
@@ -403,16 +404,24 @@ async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> {
|
||||
.await??;
|
||||
let ThreadForkResponse { thread, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
|
||||
|
||||
let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?;
|
||||
let event = thread_initialized_event(&payload)?;
|
||||
let event =
|
||||
wait_for_analytics_event(&server, ANALYTICS_TEST_TIMEOUT, "codex_thread_initialized")
|
||||
.await?;
|
||||
assert_basic_thread_initialized_event(
|
||||
event,
|
||||
&event,
|
||||
&thread.id,
|
||||
&thread.session_id,
|
||||
"mock-model",
|
||||
"forked",
|
||||
"user",
|
||||
);
|
||||
let timing_event = wait_for_analytics_event(
|
||||
&server,
|
||||
ANALYTICS_TEST_TIMEOUT,
|
||||
"thread_initialization_timing",
|
||||
)
|
||||
.await?;
|
||||
assert_thread_initialization_timing_event(&timing_event, &thread.id, "forked");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -93,10 +93,11 @@ use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
use super::analytics::ANALYTICS_TEST_TIMEOUT;
|
||||
use super::analytics::assert_basic_thread_initialized_event;
|
||||
use super::analytics::assert_thread_initialization_timing_event;
|
||||
use super::analytics::mount_analytics_capture;
|
||||
use super::analytics::thread_initialized_event;
|
||||
use super::analytics::wait_for_analytics_payload;
|
||||
use super::analytics::wait_for_analytics_event;
|
||||
|
||||
#[cfg(windows)]
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
|
||||
@@ -420,16 +421,24 @@ async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> {
|
||||
);
|
||||
assert_eq!(thread.thread_source, Some(ThreadSource::User));
|
||||
|
||||
let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?;
|
||||
let event = thread_initialized_event(&payload)?;
|
||||
let event =
|
||||
wait_for_analytics_event(&server, ANALYTICS_TEST_TIMEOUT, "codex_thread_initialized")
|
||||
.await?;
|
||||
assert_basic_thread_initialized_event(
|
||||
event,
|
||||
&event,
|
||||
&thread.id,
|
||||
&thread.session_id,
|
||||
"gpt-5.3-codex",
|
||||
"resumed",
|
||||
"user",
|
||||
);
|
||||
let timing_event = wait_for_analytics_event(
|
||||
&server,
|
||||
ANALYTICS_TEST_TIMEOUT,
|
||||
"thread_initialization_timing",
|
||||
)
|
||||
.await?;
|
||||
assert_thread_initialization_timing_event(&timing_event, &thread.id, "resumed");
|
||||
assert_eq!(event["event_params"]["thread_source"], "user");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -44,10 +44,11 @@ use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
use super::analytics::ANALYTICS_TEST_TIMEOUT;
|
||||
use super::analytics::assert_basic_thread_initialized_event;
|
||||
use super::analytics::assert_thread_initialization_timing_event;
|
||||
use super::analytics::mount_analytics_capture;
|
||||
use super::analytics::thread_initialized_event;
|
||||
use super::analytics::wait_for_analytics_payload;
|
||||
use super::analytics::wait_for_analytics_event;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
|
||||
@@ -436,17 +437,24 @@ async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> {
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(resp)?;
|
||||
|
||||
let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?;
|
||||
assert_eq!(payload["events"].as_array().expect("events array").len(), 1);
|
||||
let event = thread_initialized_event(&payload)?;
|
||||
let event =
|
||||
wait_for_analytics_event(&server, ANALYTICS_TEST_TIMEOUT, "codex_thread_initialized")
|
||||
.await?;
|
||||
assert_basic_thread_initialized_event(
|
||||
event,
|
||||
&event,
|
||||
&thread.id,
|
||||
&thread.session_id,
|
||||
"mock-model",
|
||||
"new",
|
||||
"user",
|
||||
);
|
||||
let timing_event = wait_for_analytics_event(
|
||||
&server,
|
||||
ANALYTICS_TEST_TIMEOUT,
|
||||
"thread_initialization_timing",
|
||||
)
|
||||
.await?;
|
||||
assert_thread_initialization_timing_event(&timing_event, &thread.id, "new");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::to_response;
|
||||
use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
@@ -25,6 +26,9 @@ use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::analytics::mount_analytics_capture;
|
||||
use super::analytics::wait_for_analytics_event;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
|
||||
|
||||
@@ -55,7 +59,12 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
|
||||
"call_sleep",
|
||||
)?])
|
||||
.await;
|
||||
create_config_toml(&codex_home, &server.uri(), "never", "workspace-write")?;
|
||||
write_mock_responses_config_toml_with_chatgpt_base_url(
|
||||
&codex_home,
|
||||
&server.uri(),
|
||||
&server.uri(),
|
||||
)?;
|
||||
mount_analytics_capture(&server, &codex_home).await?;
|
||||
|
||||
let mut mcp = McpProcess::new(&codex_home).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
@@ -124,6 +133,29 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
|
||||
)?;
|
||||
assert_eq!(completed.thread_id, thread_id);
|
||||
assert_eq!(completed.turn.status, TurnStatus::Interrupted);
|
||||
let event = wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "turn_timing").await?;
|
||||
assert_eq!(
|
||||
(
|
||||
event["event_params"]["thread_id"].as_str(),
|
||||
event["event_params"]["turn_id"].as_str(),
|
||||
event["event_params"]["request_start_delay_ms"]
|
||||
.as_u64()
|
||||
.is_some(),
|
||||
event["event_params"]["sampling_duration_ms"]
|
||||
.as_u64()
|
||||
.is_some(),
|
||||
event["event_params"]["blocking_tool_critical_path_duration_ms"]
|
||||
.as_u64()
|
||||
.is_some_and(|duration_ms| duration_ms > 0),
|
||||
),
|
||||
(
|
||||
Some(thread_id.as_str()),
|
||||
Some(turn_id.as_str()),
|
||||
true,
|
||||
true,
|
||||
true
|
||||
)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -137,6 +137,7 @@ pub mod spawn;
|
||||
pub(crate) mod state_db_bridge;
|
||||
pub use state_db_bridge::StateDbHandle;
|
||||
pub use state_db_bridge::init_state_db;
|
||||
mod thread_initialization_timing;
|
||||
mod thread_rollout_truncation;
|
||||
mod tools;
|
||||
pub(crate) mod turn_diff_tracker;
|
||||
|
||||
@@ -1706,6 +1706,7 @@ async fn try_run_sampling_request(
|
||||
turn_context.model_info.slug.as_str(),
|
||||
turn_context.provider.info().name.as_str(),
|
||||
);
|
||||
turn_context.turn_timing_state.mark_sampling_started().await;
|
||||
let mut stream = client_session
|
||||
.stream(
|
||||
prompt,
|
||||
@@ -2124,6 +2125,10 @@ async fn try_run_sampling_request(
|
||||
&mut assistant_message_stream_parsers,
|
||||
)
|
||||
.await;
|
||||
turn_context
|
||||
.turn_timing_state
|
||||
.mark_sampling_completed()
|
||||
.await;
|
||||
|
||||
if sess
|
||||
.features
|
||||
@@ -2134,7 +2139,15 @@ async fn try_run_sampling_request(
|
||||
client_session.send_response_processed(response_id).await;
|
||||
}
|
||||
|
||||
turn_context
|
||||
.turn_timing_state
|
||||
.mark_blocking_tool_critical_path_started()
|
||||
.await;
|
||||
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
|
||||
turn_context
|
||||
.turn_timing_state
|
||||
.mark_blocking_tool_critical_path_completed()
|
||||
.await;
|
||||
|
||||
if should_emit_token_count {
|
||||
// A tool call such as request_user_input can intentionally pause the turn. Emit token
|
||||
|
||||
@@ -6,8 +6,9 @@ mod user_shell;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use codex_analytics::TurnTimingFact;
|
||||
use codex_analytics::TurnTokenUsageFact;
|
||||
use codex_extension_api::ExtensionData;
|
||||
use futures::future::BoxFuture;
|
||||
use tokio::select;
|
||||
@@ -33,7 +34,6 @@ use crate::session::turn_context::TurnContext;
|
||||
use crate::state::ActiveTurn;
|
||||
use crate::state::RunningTask;
|
||||
use crate::state::TaskKind;
|
||||
use codex_analytics::TurnTokenUsageFact;
|
||||
use codex_login::AuthManager;
|
||||
use codex_models_manager::manager::SharedModelsManager;
|
||||
use codex_otel::SessionTelemetry;
|
||||
@@ -319,11 +319,7 @@ impl Session {
|
||||
let task: Arc<dyn AnySessionTask> = Arc::new(task);
|
||||
let task_kind = task.kind();
|
||||
let span_name = task.span_name();
|
||||
let started_at = Instant::now();
|
||||
let turn_started_at_unix_ms = turn_context
|
||||
.turn_timing_state
|
||||
.mark_turn_started(started_at)
|
||||
.await;
|
||||
let turn_started_at_unix_ms = turn_context.turn_timing_state.mark_turn_started().await;
|
||||
turn_context
|
||||
.turn_metadata_state
|
||||
.set_turn_started_at_unix_ms(turn_started_at_unix_ms);
|
||||
@@ -763,6 +759,7 @@ impl Session {
|
||||
.turn_timing_state
|
||||
.time_to_first_token_ms()
|
||||
.await;
|
||||
self.track_turn_timing(turn_context.as_ref()).await;
|
||||
self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref())
|
||||
.await;
|
||||
if let Err(err) = self
|
||||
@@ -823,6 +820,20 @@ impl Session {
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn track_turn_timing(&self, turn_context: &TurnContext) {
|
||||
let timing = turn_context.turn_timing_state.timing_breakdown().await;
|
||||
self.services
|
||||
.analytics_events_client
|
||||
.track_turn_timing(TurnTimingFact {
|
||||
turn_id: turn_context.sub_id.clone(),
|
||||
thread_id: self.conversation_id.to_string(),
|
||||
request_start_delay_ms: timing.request_start_delay_ms,
|
||||
sampling_duration_ms: timing.sampling_duration_ms,
|
||||
blocking_tool_critical_path_duration_ms: timing
|
||||
.blocking_tool_critical_path_duration_ms,
|
||||
});
|
||||
}
|
||||
|
||||
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
|
||||
let sub_id = task.turn_context.sub_id.clone();
|
||||
if task.cancellation_token.is_cancelled() {
|
||||
@@ -875,6 +886,7 @@ impl Session {
|
||||
.turn_timing_state
|
||||
.completed_at_and_duration_ms()
|
||||
.await;
|
||||
self.track_turn_timing(task.turn_context.as_ref()).await;
|
||||
let event = EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: Some(task.turn_context.sub_id.clone()),
|
||||
reason,
|
||||
|
||||
59
codex-rs/core/src/thread_initialization_timing.rs
Normal file
59
codex-rs/core/src/thread_initialization_timing.rs
Normal file
@@ -0,0 +1,59 @@
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use codex_analytics::ThreadInitializationMode;
|
||||
use codex_analytics::ThreadInitializationTimingFact;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ThreadInitializationTiming {
|
||||
started_at: Instant,
|
||||
phase_started_at: Instant,
|
||||
prepare_duration_ms: u64,
|
||||
spawn_duration_ms: u64,
|
||||
}
|
||||
|
||||
impl ThreadInitializationTiming {
|
||||
pub(crate) fn start() -> Self {
|
||||
let started_at = Instant::now();
|
||||
Self {
|
||||
started_at,
|
||||
phase_started_at: started_at,
|
||||
prepare_duration_ms: 0,
|
||||
spawn_duration_ms: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn mark_prepare_completed(&mut self) {
|
||||
self.prepare_duration_ms = self.finish_phase();
|
||||
}
|
||||
|
||||
pub(crate) fn mark_spawn_completed(&mut self) {
|
||||
self.spawn_duration_ms = self.finish_phase();
|
||||
}
|
||||
|
||||
pub(crate) fn finish(
|
||||
mut self,
|
||||
thread_id: String,
|
||||
initialization_mode: ThreadInitializationMode,
|
||||
) -> ThreadInitializationTimingFact {
|
||||
let finalize_duration_ms = self.finish_phase();
|
||||
ThreadInitializationTimingFact {
|
||||
thread_id,
|
||||
initialization_mode,
|
||||
duration_ms: duration_ms(self.started_at.elapsed()),
|
||||
prepare_duration_ms: self.prepare_duration_ms,
|
||||
spawn_duration_ms: self.spawn_duration_ms,
|
||||
finalize_duration_ms,
|
||||
}
|
||||
}
|
||||
|
||||
fn finish_phase(&mut self) -> u64 {
|
||||
let duration_ms = duration_ms(self.phase_started_at.elapsed());
|
||||
self.phase_started_at = Instant::now();
|
||||
duration_ms
|
||||
}
|
||||
}
|
||||
|
||||
fn duration_ms(duration: Duration) -> u64 {
|
||||
duration.as_millis().try_into().unwrap_or(u64::MAX)
|
||||
}
|
||||
@@ -15,7 +15,9 @@ use crate::session::INITIAL_SUBMIT_ID;
|
||||
use crate::shell_snapshot::ShellSnapshot;
|
||||
use crate::tasks::InterruptedTurnHistoryMarker;
|
||||
use crate::tasks::interrupted_turn_history_marker;
|
||||
use crate::thread_initialization_timing::ThreadInitializationTiming;
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_analytics::ThreadInitializationMode;
|
||||
use codex_app_server_protocol::ThreadHistoryBuilder;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_core_plugins::PluginsManager;
|
||||
@@ -1219,6 +1221,13 @@ impl ThreadManagerState {
|
||||
user_shell_override: Option<crate::shell::Shell>,
|
||||
) -> CodexResult<NewThread> {
|
||||
let is_resumed_thread = matches!(&initial_history, InitialHistory::Resumed(_));
|
||||
let initialization_mode = match &initial_history {
|
||||
InitialHistory::New => ThreadInitializationMode::New,
|
||||
InitialHistory::Cleared => ThreadInitializationMode::Cleared,
|
||||
InitialHistory::Forked(_) => ThreadInitializationMode::Forked,
|
||||
InitialHistory::Resumed(_) => ThreadInitializationMode::Resumed,
|
||||
};
|
||||
let mut initialization_timing = ThreadInitializationTiming::start();
|
||||
if let InitialHistory::Resumed(resumed) = &initial_history {
|
||||
let mut threads = self.threads.write().await;
|
||||
if let Some(thread) = threads.get(&resumed.conversation_id).cloned() {
|
||||
@@ -1231,6 +1240,13 @@ impl ThreadManagerState {
|
||||
resumed.conversation_id
|
||||
)));
|
||||
}
|
||||
initialization_timing.mark_prepare_completed();
|
||||
if let Some(analytics_events_client) = self.analytics_events_client.as_ref() {
|
||||
analytics_events_client.track_thread_initialization_timing(
|
||||
initialization_timing
|
||||
.finish(resumed.conversation_id.to_string(), initialization_mode),
|
||||
);
|
||||
}
|
||||
return Ok(NewThread {
|
||||
thread_id: resumed.conversation_id,
|
||||
session_configured: thread.session_configured(),
|
||||
@@ -1245,6 +1261,7 @@ impl ThreadManagerState {
|
||||
let parent_rollout_thread_trace = self
|
||||
.parent_rollout_thread_trace_for_source(&session_source, &initial_history)
|
||||
.await;
|
||||
initialization_timing.mark_prepare_completed();
|
||||
let tracked_session_source = session_source.clone();
|
||||
let CodexSpawnOk {
|
||||
codex, thread_id, ..
|
||||
@@ -1277,9 +1294,15 @@ impl ThreadManagerState {
|
||||
attestation_provider: self.attestation_provider.clone(),
|
||||
})
|
||||
.await?;
|
||||
initialization_timing.mark_spawn_completed();
|
||||
let new_thread = self
|
||||
.finalize_thread_spawn(codex, thread_id, tracked_session_source)
|
||||
.await?;
|
||||
if let Some(analytics_events_client) = self.analytics_events_client.as_ref() {
|
||||
analytics_events_client.track_thread_initialization_timing(
|
||||
initialization_timing.finish(new_thread.thread_id.to_string(), initialization_mode),
|
||||
);
|
||||
}
|
||||
if is_resumed_thread {
|
||||
new_thread.thread.emit_thread_resume_lifecycle().await;
|
||||
if let Err(err) = new_thread.thread.apply_goal_resume_runtime_effects().await {
|
||||
|
||||
@@ -36,6 +36,13 @@ pub(crate) async fn record_turn_ttfm_metric(turn_context: &TurnContext, item: &T
|
||||
.record_duration(TURN_TTFM_DURATION_METRIC, duration, &[]);
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
|
||||
pub(crate) struct TurnTimingBreakdown {
|
||||
pub(crate) request_start_delay_ms: Option<u64>,
|
||||
pub(crate) sampling_duration_ms: u64,
|
||||
pub(crate) blocking_tool_critical_path_duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct TurnTimingState {
|
||||
state: Mutex<TurnTimingStateInner>,
|
||||
@@ -47,16 +54,27 @@ struct TurnTimingStateInner {
|
||||
started_at_unix_secs: Option<i64>,
|
||||
first_token_at: Option<Instant>,
|
||||
first_message_at: Option<Instant>,
|
||||
first_request_started_at: Option<Instant>,
|
||||
sampling_started_at: Option<Instant>,
|
||||
sampling_duration: Duration,
|
||||
blocking_tool_critical_path_started_at: Option<Instant>,
|
||||
blocking_tool_critical_path_duration: Duration,
|
||||
}
|
||||
|
||||
impl TurnTimingState {
|
||||
pub(crate) async fn mark_turn_started(&self, started_at: Instant) -> i64 {
|
||||
pub(crate) async fn mark_turn_started(&self) -> i64 {
|
||||
let started_at = Instant::now();
|
||||
let started_at_unix_ms = now_unix_timestamp_ms();
|
||||
let mut state = self.state.lock().await;
|
||||
state.started_at = Some(started_at);
|
||||
state.started_at_unix_secs = Some(started_at_unix_ms / 1000);
|
||||
state.first_token_at = None;
|
||||
state.first_message_at = None;
|
||||
state.first_request_started_at = None;
|
||||
state.sampling_started_at = None;
|
||||
state.sampling_duration = Duration::default();
|
||||
state.blocking_tool_critical_path_started_at = None;
|
||||
state.blocking_tool_critical_path_duration = Duration::default();
|
||||
started_at_unix_ms
|
||||
}
|
||||
|
||||
@@ -98,6 +116,63 @@ impl TurnTimingState {
|
||||
let mut state = self.state.lock().await;
|
||||
state.record_turn_ttfm()
|
||||
}
|
||||
|
||||
pub(crate) async fn mark_sampling_started(&self) {
|
||||
let mut state = self.state.lock().await;
|
||||
if state.first_request_started_at.is_none() {
|
||||
state.first_request_started_at = Some(Instant::now());
|
||||
}
|
||||
if state.sampling_started_at.is_none() {
|
||||
state.sampling_started_at = Some(Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn mark_sampling_completed(&self) {
|
||||
let mut state = self.state.lock().await;
|
||||
if let Some(started_at) = state.sampling_started_at.take() {
|
||||
state.sampling_duration = state.sampling_duration.saturating_add(started_at.elapsed());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn mark_blocking_tool_critical_path_started(&self) {
|
||||
let mut state = self.state.lock().await;
|
||||
if state.blocking_tool_critical_path_started_at.is_none() {
|
||||
state.blocking_tool_critical_path_started_at = Some(Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn mark_blocking_tool_critical_path_completed(&self) {
|
||||
let mut state = self.state.lock().await;
|
||||
if let Some(started_at) = state.blocking_tool_critical_path_started_at.take() {
|
||||
state.blocking_tool_critical_path_duration = state
|
||||
.blocking_tool_critical_path_duration
|
||||
.saturating_add(started_at.elapsed());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn timing_breakdown(&self) -> TurnTimingBreakdown {
|
||||
let state = self.state.lock().await;
|
||||
let sampling_duration = state.sampling_duration.saturating_add(
|
||||
state
|
||||
.sampling_started_at
|
||||
.map(|started_at| started_at.elapsed())
|
||||
.unwrap_or_default(),
|
||||
);
|
||||
let blocking_tool_critical_path_duration =
|
||||
state.blocking_tool_critical_path_duration.saturating_add(
|
||||
state
|
||||
.blocking_tool_critical_path_started_at
|
||||
.map(|started_at| started_at.elapsed())
|
||||
.unwrap_or_default(),
|
||||
);
|
||||
TurnTimingBreakdown {
|
||||
request_start_delay_ms: state.request_start_delay_ms(),
|
||||
sampling_duration_ms: duration_to_u64_millis(sampling_duration),
|
||||
blocking_tool_critical_path_duration_ms: duration_to_u64_millis(
|
||||
blocking_tool_critical_path_duration,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn now_unix_timestamp_secs() -> i64 {
|
||||
@@ -112,6 +187,13 @@ pub(crate) fn now_unix_timestamp_ms() -> i64 {
|
||||
}
|
||||
|
||||
impl TurnTimingStateInner {
|
||||
fn request_start_delay_ms(&self) -> Option<u64> {
|
||||
let duration = self
|
||||
.first_request_started_at?
|
||||
.duration_since(self.started_at?);
|
||||
Some(duration_to_u64_millis(duration))
|
||||
}
|
||||
|
||||
fn time_to_first_token(&self) -> Option<Duration> {
|
||||
Some(self.first_token_at?.duration_since(self.started_at?))
|
||||
}
|
||||
@@ -136,6 +218,10 @@ impl TurnTimingStateInner {
|
||||
}
|
||||
}
|
||||
|
||||
fn duration_to_u64_millis(duration: Duration) -> u64 {
|
||||
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
|
||||
}
|
||||
|
||||
fn response_event_records_turn_ttft(event: &ResponseEvent) -> bool {
|
||||
match event {
|
||||
ResponseEvent::OutputItemDone(item) | ResponseEvent::OutputItemAdded(item) => {
|
||||
|
||||
@@ -4,9 +4,10 @@ use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::time::Instant;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use super::TurnTimingState;
|
||||
use super::response_item_records_turn_ttft;
|
||||
@@ -22,7 +23,7 @@ async fn turn_timing_state_records_ttft_only_once_per_turn() {
|
||||
None
|
||||
);
|
||||
|
||||
state.mark_turn_started(Instant::now()).await;
|
||||
state.mark_turn_started().await;
|
||||
assert_eq!(
|
||||
state
|
||||
.record_ttft_for_response_event(&ResponseEvent::Created)
|
||||
@@ -46,7 +47,7 @@ async fn turn_timing_state_records_ttft_only_once_per_turn() {
|
||||
#[tokio::test]
|
||||
async fn turn_timing_state_records_ttfm_independently_of_ttft() {
|
||||
let state = TurnTimingState::default();
|
||||
state.mark_turn_started(Instant::now()).await;
|
||||
state.mark_turn_started().await;
|
||||
|
||||
assert!(
|
||||
state
|
||||
@@ -86,7 +87,7 @@ async fn turn_timing_state_records_turn_started_epoch_millis() {
|
||||
.expect("system time should be after unix epoch")
|
||||
.as_millis();
|
||||
|
||||
let started_at_unix_ms = state.mark_turn_started(Instant::now()).await;
|
||||
let started_at_unix_ms = state.mark_turn_started().await;
|
||||
|
||||
let after = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
@@ -99,6 +100,51 @@ async fn turn_timing_state_records_turn_started_epoch_millis() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_timing_state_tracks_request_start_and_duration_breakdown() {
|
||||
let state = TurnTimingState::default();
|
||||
state.mark_turn_started().await;
|
||||
state.mark_sampling_started().await;
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
state.mark_sampling_completed().await;
|
||||
state.mark_sampling_started().await;
|
||||
sleep(Duration::from_millis(5)).await;
|
||||
state.mark_sampling_completed().await;
|
||||
state.mark_blocking_tool_critical_path_started().await;
|
||||
sleep(Duration::from_millis(5)).await;
|
||||
state.mark_blocking_tool_critical_path_completed().await;
|
||||
|
||||
let breakdown = state.timing_breakdown().await;
|
||||
|
||||
assert_eq!(
|
||||
(
|
||||
breakdown.sampling_duration_ms >= 15,
|
||||
breakdown.blocking_tool_critical_path_duration_ms >= 5,
|
||||
breakdown.request_start_delay_ms.is_some(),
|
||||
),
|
||||
(true, true, true)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_timing_state_snapshots_active_intervals() {
|
||||
let state = TurnTimingState::default();
|
||||
state.mark_turn_started().await;
|
||||
state.mark_sampling_started().await;
|
||||
state.mark_blocking_tool_critical_path_started().await;
|
||||
sleep(Duration::from_millis(5)).await;
|
||||
|
||||
let breakdown = state.timing_breakdown().await;
|
||||
|
||||
assert_eq!(
|
||||
(
|
||||
breakdown.sampling_duration_ms >= 5,
|
||||
breakdown.blocking_tool_critical_path_duration_ms >= 5,
|
||||
),
|
||||
(true, true)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn response_item_records_turn_ttft_for_first_output_signals() {
|
||||
assert!(response_item_records_turn_ttft(
|
||||
|
||||
Reference in New Issue
Block a user