Compare commits

...

4 Commits

Author SHA1 Message Date
rhan-oai
92f515bdf8 [codex-analytics] denormalize thread metadata onto turn events 2026-04-09 15:31:46 -07:00
rhan-oai
05cd2af4d5 [codex-analytics] add steering metadata 2026-04-09 15:31:46 -07:00
rhan-oai
0681ff6622 [codex-analytics] add token usage metadata 2026-04-09 15:26:54 -07:00
rhan-oai
f58ed8990c [codex-analytics] feature plumbing and emittance 2026-04-09 15:26:51 -07:00
31 changed files with 2547 additions and 94 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -3,6 +3,7 @@ use crate::events::TrackEventRequest;
use crate::events::TrackEventsRequest;
use crate::events::current_runtime_metadata;
use crate::facts::AnalyticsFact;
use crate::facts::AnalyticsJsonRpcError;
use crate::facts::AppInvocation;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
@@ -13,9 +14,15 @@ use crate::facts::SkillInvocation;
use crate::facts::SkillInvokedInput;
use crate::facts::SubAgentThreadStartedInput;
use crate::facts::TrackEventsContext;
use crate::facts::TurnResolvedConfigFact;
use crate::facts::TurnTokenUsageFact;
use crate::reducer::AnalyticsReducer;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_login::AuthManager;
use codex_login::default_client::create_client;
use codex_plugin::PluginTelemetryMetadata;
@@ -160,6 +167,14 @@ impl AnalyticsEventsClient {
)));
}
pub fn track_request(&self, connection_id: u64, request_id: RequestId, request: ClientRequest) {
self.record_fact(AnalyticsFact::Request {
connection_id,
request_id,
request: Box::new(request),
});
}
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
if !self.queue.should_enqueue_app_used(&tracking, &app) {
return;
@@ -178,6 +193,18 @@ impl AnalyticsEventsClient {
)));
}
pub fn track_turn_resolved_config(&self, fact: TurnResolvedConfigFact) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::TurnResolvedConfig(Box::new(fact)),
));
}
pub fn track_turn_token_usage(&self, fact: TurnTokenUsageFact) {
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::TurnTokenUsage(
Box::new(fact),
)));
}
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
@@ -227,6 +254,25 @@ impl AnalyticsEventsClient {
response: Box::new(response),
});
}
pub fn track_error_response(
&self,
connection_id: u64,
request_id: RequestId,
error: JSONRPCErrorError,
error_type: Option<AnalyticsJsonRpcError>,
) {
self.record_fact(AnalyticsFact::ErrorResponse {
connection_id,
request_id,
error,
error_type,
});
}
pub fn track_notification(&self, notification: ServerNotification) {
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
}
}
async fn send_track_events(

View File

@@ -1,8 +1,15 @@
use crate::facts::AppInvocation;
use crate::facts::CodexTurnSteerEvent;
use crate::facts::InvocationType;
use crate::facts::PluginState;
use crate::facts::SubAgentThreadStartedInput;
use crate::facts::ThreadInitializationMode;
use crate::facts::TrackEventsContext;
use crate::facts::TurnStatus;
use crate::facts::TurnSteerRejectionReason;
use crate::facts::TurnSteerResult;
use crate::facts::TurnSubmissionType;
use codex_app_server_protocol::CodexErrorInfo;
use codex_login::default_client::originator;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::protocol::SessionSource;
@@ -17,14 +24,6 @@ pub enum AppServerRpcTransport {
InProcess,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ThreadInitializationMode {
New,
Forked,
Resumed,
}
#[derive(Serialize)]
pub(crate) struct TrackEventsRequest {
pub(crate) events: Vec<TrackEventRequest>,
@@ -37,6 +36,8 @@ pub(crate) enum TrackEventRequest {
ThreadInitialized(ThreadInitializedEvent),
AppMentioned(CodexAppMentionedEventRequest),
AppUsed(CodexAppUsedEventRequest),
TurnEvent(Box<CodexTurnEventRequest>),
TurnSteer(CodexTurnSteerEventRequest),
PluginUsed(CodexPluginUsedEventRequest),
PluginInstalled(CodexPluginEventRequest),
PluginUninstalled(CodexPluginEventRequest),
@@ -122,6 +123,77 @@ pub(crate) struct CodexAppUsedEventRequest {
pub(crate) event_params: CodexAppMetadata,
}
#[derive(Serialize)]
pub(crate) struct CodexTurnEventParams {
pub(crate) thread_id: String,
pub(crate) turn_id: String,
pub(crate) submission_type: Option<TurnSubmissionType>,
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) ephemeral: bool,
pub(crate) thread_source: Option<String>,
pub(crate) initialization_mode: ThreadInitializationMode,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) model: Option<String>,
pub(crate) model_provider: String,
pub(crate) sandbox_policy: Option<&'static str>,
pub(crate) reasoning_effort: Option<String>,
pub(crate) reasoning_summary: Option<String>,
pub(crate) service_tier: String,
pub(crate) approval_policy: String,
pub(crate) approvals_reviewer: String,
pub(crate) sandbox_network_access: bool,
pub(crate) collaboration_mode: Option<&'static str>,
pub(crate) personality: Option<String>,
pub(crate) num_input_images: usize,
pub(crate) is_first_turn: bool,
pub(crate) status: Option<TurnStatus>,
pub(crate) turn_error: Option<CodexErrorInfo>,
pub(crate) steer_count: Option<usize>,
pub(crate) total_tool_call_count: Option<usize>,
pub(crate) shell_command_count: Option<usize>,
pub(crate) file_change_count: Option<usize>,
pub(crate) mcp_tool_call_count: Option<usize>,
pub(crate) dynamic_tool_call_count: Option<usize>,
pub(crate) subagent_tool_call_count: Option<usize>,
pub(crate) web_search_count: Option<usize>,
pub(crate) image_generation_count: Option<usize>,
pub(crate) input_tokens: Option<i64>,
pub(crate) cached_input_tokens: Option<i64>,
pub(crate) output_tokens: Option<i64>,
pub(crate) reasoning_output_tokens: Option<i64>,
pub(crate) total_tokens: Option<i64>,
pub(crate) duration_ms: Option<u64>,
pub(crate) started_at: Option<u64>,
pub(crate) completed_at: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexTurnEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexTurnEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexTurnSteerEventParams {
pub(crate) thread_id: String,
pub(crate) expected_turn_id: Option<String>,
pub(crate) accepted_turn_id: Option<String>,
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) num_input_images: usize,
pub(crate) result: TurnSteerResult,
pub(crate) rejection_reason: Option<TurnSteerRejectionReason>,
pub(crate) created_at: u64,
}
#[derive(Serialize)]
pub(crate) struct CodexTurnSteerEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexTurnSteerEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexPluginMetadata {
pub(crate) plugin_id: Option<String>,
@@ -213,6 +285,25 @@ pub(crate) fn codex_plugin_used_metadata(
}
}
pub(crate) fn codex_turn_steer_event_params(
app_server_client: CodexAppServerClientMetadata,
runtime: CodexRuntimeMetadata,
tracking: &TrackEventsContext,
turn_steer: CodexTurnSteerEvent,
) -> CodexTurnSteerEventParams {
CodexTurnSteerEventParams {
thread_id: tracking.thread_id.clone(),
expected_turn_id: turn_steer.expected_turn_id,
accepted_turn_id: turn_steer.accepted_turn_id,
app_server_client,
runtime,
num_input_images: turn_steer.num_input_images,
result: turn_steer.result,
rejection_reason: turn_steer.rejection_reason,
created_at: turn_steer.created_at,
}
}
pub(crate) fn thread_source_name(thread_source: &SessionSource) -> Option<&'static str> {
match thread_source {
SessionSource::Cli | SessionSource::VSCode | SessionSource::Exec => Some("user"),
@@ -276,3 +367,27 @@ fn subagent_parent_thread_id(subagent_source: &SubAgentSource) -> Option<String>
_ => None,
}
}
pub(crate) fn turn_subagent_source_name(thread_source: &SessionSource) -> Option<String> {
match thread_source {
SessionSource::SubAgent(subagent_source) => Some(subagent_source_name(subagent_source)),
SessionSource::Cli
| SessionSource::VSCode
| SessionSource::Exec
| SessionSource::Mcp
| SessionSource::Custom(_)
| SessionSource::Unknown => None,
}
}
pub(crate) fn turn_parent_thread_id(thread_source: &SessionSource) -> Option<String> {
match thread_source {
SessionSource::SubAgent(subagent_source) => subagent_parent_thread_id(subagent_source),
SessionSource::Cli
| SessionSource::VSCode
| SessionSource::Exec
| SessionSource::Mcp
| SessionSource::Custom(_)
| SessionSource::Unknown => None,
}
}

View File

@@ -3,11 +3,23 @@ use crate::events::CodexRuntimeMetadata;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::NonSteerableTurnKind;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::config_types::ServiceTier;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SkillScope;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TokenUsage;
use serde::Serialize;
use std::path::PathBuf;
@@ -30,6 +42,106 @@ pub fn build_track_events_context(
}
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum TurnSubmissionType {
Default,
Queued,
}
#[derive(Clone)]
pub struct TurnResolvedConfigFact {
pub turn_id: String,
pub thread_id: String,
pub num_input_images: usize,
pub submission_type: Option<TurnSubmissionType>,
pub ephemeral: bool,
pub session_source: SessionSource,
pub initialization_mode: ThreadInitializationMode,
pub model: String,
pub model_provider: String,
pub sandbox_policy: SandboxPolicy,
pub reasoning_effort: Option<ReasoningEffort>,
pub reasoning_summary: Option<ReasoningSummary>,
pub service_tier: Option<ServiceTier>,
pub approval_policy: AskForApproval,
pub approvals_reviewer: ApprovalsReviewer,
pub sandbox_network_access: bool,
pub collaboration_mode: ModeKind,
pub personality: Option<Personality>,
pub is_first_turn: bool,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ThreadInitializationMode {
New,
Forked,
Resumed,
}
#[derive(Clone)]
pub struct TurnTokenUsageFact {
pub turn_id: String,
pub thread_id: String,
pub token_usage: TokenUsage,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum TurnStatus {
Completed,
Failed,
Interrupted,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum TurnSteerResult {
Accepted,
Rejected,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum TurnSteerRejectionReason {
NoActiveTurn,
ExpectedTurnMismatch,
NonSteerableReview,
NonSteerableCompact,
EmptyInput,
InputTooLarge,
}
#[derive(Clone)]
pub struct CodexTurnSteerEvent {
pub expected_turn_id: Option<String>,
pub accepted_turn_id: Option<String>,
pub num_input_images: usize,
pub result: TurnSteerResult,
pub rejection_reason: Option<TurnSteerRejectionReason>,
pub created_at: u64,
}
#[derive(Clone, Copy, Debug)]
pub enum AnalyticsJsonRpcError {
TurnSteer(TurnSteerRequestError),
Input(InputError),
}
#[derive(Clone, Copy, Debug)]
pub enum TurnSteerRequestError {
NoActiveTurn,
ExpectedTurnMismatch,
ActiveTurnNotSteerable { turn_kind: NonSteerableTurnKind },
}
#[derive(Clone, Copy, Debug)]
pub enum InputError {
Empty,
TooLarge,
}
#[derive(Clone, Debug)]
pub struct SkillInvocation {
pub skill_name: String,
@@ -81,6 +193,12 @@ pub(crate) enum AnalyticsFact {
connection_id: u64,
response: Box<ClientResponse>,
},
ErrorResponse {
connection_id: u64,
request_id: RequestId,
error: JSONRPCErrorError,
error_type: Option<AnalyticsJsonRpcError>,
},
Notification(Box<ServerNotification>),
// Facts that do not naturally exist on the app-server protocol surface, or
// would require non-trivial protocol reshaping on this branch.
@@ -89,6 +207,8 @@ pub(crate) enum AnalyticsFact {
pub(crate) enum CustomAnalyticsFact {
SubAgentThreadStarted(SubAgentThreadStartedInput),
TurnResolvedConfig(Box<TurnResolvedConfigFact>),
TurnTokenUsage(Box<TurnTokenUsageFact>),
SkillInvoked(SkillInvokedInput),
AppMentioned(AppMentionedInput),
AppUsed(AppUsedInput),

View File

@@ -5,11 +5,21 @@ mod reducer;
pub use client::AnalyticsEventsClient;
pub use events::AppServerRpcTransport;
pub use facts::AnalyticsJsonRpcError;
pub use facts::AppInvocation;
pub use facts::CodexTurnSteerEvent;
pub use facts::InputError;
pub use facts::InvocationType;
pub use facts::SkillInvocation;
pub use facts::SubAgentThreadStartedInput;
pub use facts::ThreadInitializationMode;
pub use facts::TrackEventsContext;
pub use facts::TurnResolvedConfigFact;
pub use facts::TurnStatus;
pub use facts::TurnSteerRejectionReason;
pub use facts::TurnSteerRequestError;
pub use facts::TurnSteerResult;
pub use facts::TurnTokenUsageFact;
pub use facts::build_track_events_context;
#[cfg(test)]

View File

@@ -5,40 +5,72 @@ use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexRuntimeMetadata;
use crate::events::CodexTurnEventParams;
use crate::events::CodexTurnEventRequest;
use crate::events::CodexTurnSteerEventRequest;
use crate::events::SkillInvocationEventParams;
use crate::events::SkillInvocationEventRequest;
use crate::events::ThreadInitializationMode;
use crate::events::ThreadInitializedEvent;
use crate::events::ThreadInitializedEventParams;
use crate::events::TrackEventRequest;
use crate::events::codex_app_metadata;
use crate::events::codex_plugin_metadata;
use crate::events::codex_plugin_used_metadata;
use crate::events::codex_turn_steer_event_params;
use crate::events::plugin_state_event_type;
use crate::events::subagent_thread_started_event_request;
use crate::events::thread_source_name;
use crate::events::turn_parent_thread_id;
use crate::events::turn_subagent_source_name;
use crate::facts::AnalyticsFact;
use crate::facts::AnalyticsJsonRpcError;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
use crate::facts::CodexTurnSteerEvent;
use crate::facts::CustomAnalyticsFact;
use crate::facts::InputError;
use crate::facts::PluginState;
use crate::facts::PluginStateChangedInput;
use crate::facts::PluginUsedInput;
use crate::facts::SkillInvokedInput;
use crate::facts::SubAgentThreadStartedInput;
use crate::facts::ThreadInitializationMode;
use crate::facts::TrackEventsContext;
use crate::facts::TurnResolvedConfigFact;
use crate::facts::TurnStatus;
use crate::facts::TurnSteerRejectionReason;
use crate::facts::TurnSteerRequestError;
use crate::facts::TurnSteerResult;
use crate::facts::TurnTokenUsageFact;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::NonSteerableTurnKind;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInput;
use codex_git_utils::collect_git_info;
use codex_git_utils::get_git_repo_root;
use codex_login::default_client::originator;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SkillScope;
use codex_protocol::protocol::TokenUsage;
use sha1::Digest;
use std::collections::HashMap;
use std::path::Path;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
#[derive(Default)]
pub(crate) struct AnalyticsReducer {
requests: HashMap<(u64, RequestId), RequestState>,
turns: HashMap<String, TurnState>,
connections: HashMap<u64, ConnectionState>,
}
@@ -47,6 +79,42 @@ struct ConnectionState {
runtime: CodexRuntimeMetadata,
}
enum RequestState {
TurnStart(PendingTurnStartState),
TurnSteer(PendingTurnSteerState),
}
struct PendingTurnStartState {
thread_id: String,
num_input_images: usize,
}
struct PendingTurnSteerState {
thread_id: String,
expected_turn_id: String,
num_input_images: usize,
created_at: u64,
}
#[derive(Clone)]
struct CompletedTurnState {
status: Option<TurnStatus>,
turn_error: Option<CodexErrorInfo>,
completed_at: u64,
duration_ms: Option<u64>,
}
struct TurnState {
connection_id: Option<u64>,
thread_id: Option<String>,
num_input_images: Option<usize>,
resolved_config: Option<TurnResolvedConfigFact>,
started_at: Option<u64>,
token_usage: Option<TokenUsage>,
completed: Option<CompletedTurnState>,
steer_count: usize,
}
impl AnalyticsReducer {
pub(crate) async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec<TrackEventRequest>) {
match input {
@@ -66,21 +134,39 @@ impl AnalyticsReducer {
);
}
AnalyticsFact::Request {
connection_id: _connection_id,
request_id: _request_id,
request: _request,
} => {}
connection_id,
request_id,
request,
} => {
self.ingest_request(connection_id, request_id, *request);
}
AnalyticsFact::Response {
connection_id,
response,
} => {
self.ingest_response(connection_id, *response, out);
}
AnalyticsFact::Notification(_notification) => {}
AnalyticsFact::ErrorResponse {
connection_id,
request_id,
error: _,
error_type,
} => {
self.ingest_error_response(connection_id, request_id, error_type, out);
}
AnalyticsFact::Notification(notification) => {
self.ingest_notification(*notification, out);
}
AnalyticsFact::Custom(input) => match input {
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
self.ingest_subagent_thread_started(input, out);
}
CustomAnalyticsFact::TurnResolvedConfig(input) => {
self.ingest_turn_resolved_config(*input, out);
}
CustomAnalyticsFact::TurnTokenUsage(input) => {
self.ingest_turn_token_usage(*input, out);
}
CustomAnalyticsFact::SkillInvoked(input) => {
self.ingest_skill_invoked(input, out).await;
}
@@ -135,6 +221,82 @@ impl AnalyticsReducer {
));
}
fn ingest_request(
&mut self,
connection_id: u64,
request_id: RequestId,
request: ClientRequest,
) {
match request {
ClientRequest::TurnStart { params, .. } => {
self.requests.insert(
(connection_id, request_id),
RequestState::TurnStart(PendingTurnStartState {
thread_id: params.thread_id,
num_input_images: num_input_images(&params.input),
}),
);
}
ClientRequest::TurnSteer { params, .. } => {
self.requests.insert(
(connection_id, request_id),
RequestState::TurnSteer(PendingTurnSteerState {
thread_id: params.thread_id,
expected_turn_id: params.expected_turn_id,
num_input_images: num_input_images(&params.input),
created_at: now_unix_seconds(),
}),
);
}
_ => {}
}
}
fn ingest_turn_resolved_config(
&mut self,
input: TurnResolvedConfigFact,
out: &mut Vec<TrackEventRequest>,
) {
let turn_id = input.turn_id.clone();
let thread_id = input.thread_id.clone();
let num_input_images = input.num_input_images;
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
steer_count: 0,
});
turn_state.thread_id = Some(thread_id);
turn_state.num_input_images = Some(num_input_images);
turn_state.resolved_config = Some(input);
self.maybe_emit_turn_event(&turn_id, out);
}
fn ingest_turn_token_usage(
&mut self,
input: TurnTokenUsageFact,
out: &mut Vec<TrackEventRequest>,
) {
let turn_id = input.turn_id.clone();
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
steer_count: 0,
});
turn_state.thread_id = Some(input.thread_id);
turn_state.token_usage = Some(input.token_usage);
self.maybe_emit_turn_event(&turn_id, out);
}
async fn ingest_skill_invoked(
&mut self,
input: SkillInvokedInput,
@@ -235,24 +397,186 @@ impl AnalyticsReducer {
response: ClientResponse,
out: &mut Vec<TrackEventRequest>,
) {
let (thread, model, initialization_mode) = match response {
ClientResponse::ThreadStart { response, .. } => (
response.thread,
response.model,
ThreadInitializationMode::New,
),
ClientResponse::ThreadResume { response, .. } => (
response.thread,
response.model,
ThreadInitializationMode::Resumed,
),
ClientResponse::ThreadFork { response, .. } => (
response.thread,
response.model,
ThreadInitializationMode::Forked,
),
_ => return,
match response {
ClientResponse::ThreadStart { response, .. } => {
self.emit_thread_initialized(
connection_id,
response.thread,
response.model,
ThreadInitializationMode::New,
out,
);
}
ClientResponse::ThreadResume { response, .. } => {
self.emit_thread_initialized(
connection_id,
response.thread,
response.model,
ThreadInitializationMode::Resumed,
out,
);
}
ClientResponse::ThreadFork { response, .. } => {
self.emit_thread_initialized(
connection_id,
response.thread,
response.model,
ThreadInitializationMode::Forked,
out,
);
}
ClientResponse::TurnStart {
request_id,
response,
} => {
let turn_id = response.turn.id;
let Some(RequestState::TurnStart(pending_request)) =
self.requests.remove(&(connection_id, request_id))
else {
return;
};
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
steer_count: 0,
});
turn_state.connection_id = Some(connection_id);
turn_state.thread_id = Some(pending_request.thread_id);
turn_state.num_input_images = Some(pending_request.num_input_images);
self.maybe_emit_turn_event(&turn_id, out);
}
ClientResponse::TurnSteer {
request_id,
response,
} => {
self.ingest_turn_steer_response(connection_id, request_id, response, out);
}
_ => {}
}
}
fn ingest_error_response(
&mut self,
connection_id: u64,
request_id: RequestId,
error_type: Option<AnalyticsJsonRpcError>,
out: &mut Vec<TrackEventRequest>,
) {
let Some(request) = self.requests.remove(&(connection_id, request_id)) else {
return;
};
self.ingest_request_error_response(connection_id, request, error_type, out);
}
fn ingest_request_error_response(
&mut self,
connection_id: u64,
request: RequestState,
error_type: Option<AnalyticsJsonRpcError>,
out: &mut Vec<TrackEventRequest>,
) {
match request {
RequestState::TurnStart(_) => {}
RequestState::TurnSteer(pending_request) => {
self.ingest_turn_steer_error_response(
connection_id,
pending_request,
error_type,
out,
);
}
}
}
fn ingest_turn_steer_error_response(
&mut self,
connection_id: u64,
pending_request: PendingTurnSteerState,
error_type: Option<AnalyticsJsonRpcError>,
out: &mut Vec<TrackEventRequest>,
) {
self.emit_turn_steer_event(
connection_id,
pending_request,
/*accepted_turn_id*/ None,
TurnSteerResult::Rejected,
rejection_reason_from_error_type(error_type),
out,
);
}
fn ingest_notification(
&mut self,
notification: ServerNotification,
out: &mut Vec<TrackEventRequest>,
) {
match notification {
ServerNotification::TurnStarted(notification) => {
let turn_state = self.turns.entry(notification.turn.id).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
steer_count: 0,
});
turn_state.started_at = notification
.turn
.started_at
.and_then(|started_at| u64::try_from(started_at).ok());
}
ServerNotification::TurnCompleted(notification) => {
let turn_state =
self.turns
.entry(notification.turn.id.clone())
.or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
steer_count: 0,
});
turn_state.completed = Some(CompletedTurnState {
status: analytics_turn_status(notification.turn.status),
turn_error: notification
.turn
.error
.and_then(|error| error.codex_error_info),
completed_at: notification
.turn
.completed_at
.and_then(|completed_at| u64::try_from(completed_at).ok())
.unwrap_or_default(),
duration_ms: notification
.turn
.duration_ms
.and_then(|duration_ms| u64::try_from(duration_ms).ok()),
});
let turn_id = notification.turn.id;
self.maybe_emit_turn_event(&turn_id, out);
}
_ => {}
}
}
fn emit_thread_initialized(
&mut self,
connection_id: u64,
thread: codex_app_server_protocol::Thread,
model: String,
initialization_mode: ThreadInitializationMode,
out: &mut Vec<TrackEventRequest>,
) {
let thread_source: SessionSource = thread.source.into();
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
@@ -275,6 +599,281 @@ impl AnalyticsReducer {
},
));
}
fn ingest_turn_steer_response(
&mut self,
connection_id: u64,
request_id: RequestId,
response: TurnSteerResponse,
out: &mut Vec<TrackEventRequest>,
) {
let Some(RequestState::TurnSteer(pending_request)) =
self.requests.remove(&(connection_id, request_id))
else {
return;
};
if let Some(turn_state) = self.turns.get_mut(&response.turn_id) {
turn_state.steer_count += 1;
}
self.emit_turn_steer_event(
connection_id,
pending_request,
Some(response.turn_id),
TurnSteerResult::Accepted,
/*rejection_reason*/ None,
out,
);
}
fn emit_turn_steer_event(
&mut self,
connection_id: u64,
pending_request: PendingTurnSteerState,
accepted_turn_id: Option<String>,
result: TurnSteerResult,
rejection_reason: Option<TurnSteerRejectionReason>,
out: &mut Vec<TrackEventRequest>,
) {
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
};
let tracking = TrackEventsContext {
model_slug: String::new(),
thread_id: pending_request.thread_id,
turn_id: accepted_turn_id
.as_deref()
.unwrap_or(pending_request.expected_turn_id.as_str())
.to_string(),
};
let turn_steer = CodexTurnSteerEvent {
expected_turn_id: Some(pending_request.expected_turn_id),
accepted_turn_id,
num_input_images: pending_request.num_input_images,
result,
rejection_reason,
created_at: pending_request.created_at,
};
out.push(TrackEventRequest::TurnSteer(CodexTurnSteerEventRequest {
event_type: "codex_turn_steer_event",
event_params: codex_turn_steer_event_params(
connection_state.app_server_client.clone(),
connection_state.runtime.clone(),
&tracking,
turn_steer,
),
}));
}
fn maybe_emit_turn_event(&mut self, turn_id: &str, out: &mut Vec<TrackEventRequest>) {
let Some(turn_state) = self.turns.get(turn_id) else {
return;
};
if turn_state.thread_id.is_none()
|| turn_state.num_input_images.is_none()
|| turn_state.resolved_config.is_none()
|| turn_state.completed.is_none()
{
return;
}
let connection_metadata = turn_state
.connection_id
.and_then(|connection_id| self.connections.get(&connection_id))
.map(|connection_state| {
(
connection_state.app_server_client.clone(),
connection_state.runtime.clone(),
)
});
let Some((app_server_client, runtime)) = connection_metadata else {
return;
};
out.push(TrackEventRequest::TurnEvent(Box::new(
CodexTurnEventRequest {
event_type: "codex_turn_event",
event_params: codex_turn_event_params(
app_server_client,
runtime,
turn_id.to_string(),
turn_state,
),
},
)));
self.turns.remove(turn_id);
}
}
fn codex_turn_event_params(
app_server_client: CodexAppServerClientMetadata,
runtime: CodexRuntimeMetadata,
turn_id: String,
turn_state: &TurnState,
) -> CodexTurnEventParams {
let (Some(thread_id), Some(num_input_images), Some(resolved_config), Some(completed)) = (
turn_state.thread_id.clone(),
turn_state.num_input_images,
turn_state.resolved_config.clone(),
turn_state.completed.clone(),
) else {
unreachable!("turn event params require a fully populated turn state");
};
let started_at = turn_state.started_at;
let TurnResolvedConfigFact {
turn_id: _resolved_turn_id,
thread_id: _resolved_thread_id,
num_input_images: _resolved_num_input_images,
submission_type,
ephemeral,
session_source,
initialization_mode,
model,
model_provider,
sandbox_policy,
reasoning_effort,
reasoning_summary,
service_tier,
approval_policy,
approvals_reviewer,
sandbox_network_access,
collaboration_mode,
personality,
is_first_turn,
} = resolved_config;
let token_usage = turn_state.token_usage.clone();
CodexTurnEventParams {
thread_id,
turn_id,
app_server_client,
runtime,
submission_type,
ephemeral,
thread_source: thread_source_name(&session_source).map(str::to_string),
initialization_mode,
subagent_source: turn_subagent_source_name(&session_source),
parent_thread_id: turn_parent_thread_id(&session_source),
model: Some(model),
model_provider,
sandbox_policy: Some(sandbox_policy_mode(&sandbox_policy)),
reasoning_effort: reasoning_effort.map(|value| value.to_string()),
reasoning_summary: reasoning_summary_mode(reasoning_summary),
service_tier: service_tier
.map(|value| value.to_string())
.unwrap_or_else(|| "default".to_string()),
approval_policy: approval_policy.to_string(),
approvals_reviewer: approvals_reviewer.to_string(),
sandbox_network_access,
collaboration_mode: Some(collaboration_mode_mode(collaboration_mode)),
personality: personality_mode(personality),
num_input_images,
is_first_turn,
status: completed.status,
turn_error: completed.turn_error,
steer_count: Some(turn_state.steer_count),
total_tool_call_count: None,
shell_command_count: None,
file_change_count: None,
mcp_tool_call_count: None,
dynamic_tool_call_count: None,
subagent_tool_call_count: None,
web_search_count: None,
image_generation_count: None,
input_tokens: token_usage
.as_ref()
.map(|token_usage| token_usage.input_tokens),
cached_input_tokens: token_usage
.as_ref()
.map(|token_usage| token_usage.cached_input_tokens),
output_tokens: token_usage
.as_ref()
.map(|token_usage| token_usage.output_tokens),
reasoning_output_tokens: token_usage
.as_ref()
.map(|token_usage| token_usage.reasoning_output_tokens),
total_tokens: token_usage
.as_ref()
.map(|token_usage| token_usage.total_tokens),
duration_ms: completed.duration_ms,
started_at,
completed_at: Some(completed.completed_at),
}
}
fn sandbox_policy_mode(sandbox_policy: &SandboxPolicy) -> &'static str {
match sandbox_policy {
SandboxPolicy::DangerFullAccess => "full_access",
SandboxPolicy::ReadOnly { .. } => "read_only",
SandboxPolicy::WorkspaceWrite { .. } => "workspace_write",
SandboxPolicy::ExternalSandbox { .. } => "external_sandbox",
}
}
fn collaboration_mode_mode(mode: ModeKind) -> &'static str {
match mode {
ModeKind::Plan => "plan",
ModeKind::Default | ModeKind::PairProgramming | ModeKind::Execute => "default",
}
}
fn reasoning_summary_mode(summary: Option<ReasoningSummary>) -> Option<String> {
match summary {
Some(ReasoningSummary::None) | None => None,
Some(summary) => Some(summary.to_string()),
}
}
fn personality_mode(personality: Option<Personality>) -> Option<String> {
match personality {
Some(Personality::None) | None => None,
Some(personality) => Some(personality.to_string()),
}
}
fn analytics_turn_status(status: codex_app_server_protocol::TurnStatus) -> Option<TurnStatus> {
match status {
codex_app_server_protocol::TurnStatus::Completed => Some(TurnStatus::Completed),
codex_app_server_protocol::TurnStatus::Failed => Some(TurnStatus::Failed),
codex_app_server_protocol::TurnStatus::Interrupted => Some(TurnStatus::Interrupted),
codex_app_server_protocol::TurnStatus::InProgress => None,
}
}
fn num_input_images(input: &[UserInput]) -> usize {
input
.iter()
.filter(|item| matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. }))
.count()
}
fn now_unix_seconds() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
fn rejection_reason_from_error_type(
error_type: Option<AnalyticsJsonRpcError>,
) -> Option<TurnSteerRejectionReason> {
match error_type {
Some(AnalyticsJsonRpcError::TurnSteer(TurnSteerRequestError::NoActiveTurn)) => {
Some(TurnSteerRejectionReason::NoActiveTurn)
}
Some(AnalyticsJsonRpcError::TurnSteer(TurnSteerRequestError::ExpectedTurnMismatch)) => {
Some(TurnSteerRejectionReason::ExpectedTurnMismatch)
}
Some(AnalyticsJsonRpcError::TurnSteer(TurnSteerRequestError::ActiveTurnNotSteerable {
turn_kind,
})) => match turn_kind {
NonSteerableTurnKind::Review => Some(TurnSteerRejectionReason::NonSteerableReview),
NonSteerableTurnKind::Compact => Some(TurnSteerRejectionReason::NonSteerableCompact),
},
Some(AnalyticsJsonRpcError::Input(InputError::Empty)) => {
Some(TurnSteerRejectionReason::EmptyInput)
}
Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)) => {
Some(TurnSteerRejectionReason::InputTooLarge)
}
None => None,
}
}
pub(crate) fn skill_id_for_local_skill(

View File

@@ -12,6 +12,7 @@ use crate::thread_state::TurnSummary;
use crate::thread_state::resolve_server_request_on_thread_listener;
use crate::thread_status::ThreadWatchActiveGuard;
use crate::thread_status::ThreadWatchManager;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
use codex_app_server_protocol::AdditionalPermissionProfile as V2AdditionalPermissionProfile;
use codex_app_server_protocol::AgentMessageDeltaNotification;
@@ -167,6 +168,7 @@ pub(crate) async fn apply_bespoke_event_handling(
conversation_id: ThreadId,
conversation: Arc<CodexThread>,
thread_manager: Arc<ThreadManager>,
analytics_events_client: Option<AnalyticsEventsClient>,
outgoing: ThreadScopedOutgoingMessageSender,
thread_state: Arc<tokio::sync::Mutex<ThreadState>>,
thread_watch_manager: ThreadWatchManager,
@@ -202,6 +204,10 @@ pub(crate) async fn apply_bespoke_event_handling(
thread_id: conversation_id.to_string(),
turn,
};
if let Some(analytics_events_client) = analytics_events_client.as_ref() {
analytics_events_client
.track_notification(ServerNotification::TurnStarted(notification.clone()));
}
outgoing
.send_server_notification(ServerNotification::TurnStarted(notification))
.await;
@@ -218,6 +224,7 @@ pub(crate) async fn apply_bespoke_event_handling(
conversation_id,
event_turn_id,
turn_complete_event,
analytics_events_client.as_ref(),
&outgoing,
&thread_state,
)
@@ -1731,6 +1738,7 @@ pub(crate) async fn apply_bespoke_event_handling(
conversation_id,
event_turn_id,
turn_aborted_event,
analytics_events_client.as_ref(),
&outgoing,
&thread_state,
)
@@ -1908,6 +1916,7 @@ async fn emit_turn_completed_with_status(
conversation_id: ThreadId,
event_turn_id: String,
turn_completion_metadata: TurnCompletionMetadata,
analytics_events_client: Option<&AnalyticsEventsClient>,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
let notification = TurnCompletedNotification {
@@ -1922,6 +1931,10 @@ async fn emit_turn_completed_with_status(
duration_ms: turn_completion_metadata.duration_ms,
},
};
if let Some(analytics_events_client) = analytics_events_client {
analytics_events_client
.track_notification(ServerNotification::TurnCompleted(notification.clone()));
}
outgoing
.send_server_notification(ServerNotification::TurnCompleted(notification))
.await;
@@ -2114,6 +2127,7 @@ async fn handle_turn_complete(
conversation_id: ThreadId,
event_turn_id: String,
turn_complete_event: TurnCompleteEvent,
analytics_events_client: Option<&AnalyticsEventsClient>,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
@@ -2134,6 +2148,7 @@ async fn handle_turn_complete(
completed_at: turn_complete_event.completed_at,
duration_ms: turn_complete_event.duration_ms,
},
analytics_events_client,
outgoing,
)
.await;
@@ -2143,6 +2158,7 @@ async fn handle_turn_interrupted(
conversation_id: ThreadId,
event_turn_id: String,
turn_aborted_event: TurnAbortedEvent,
analytics_events_client: Option<&AnalyticsEventsClient>,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
@@ -2158,6 +2174,7 @@ async fn handle_turn_interrupted(
completed_at: turn_aborted_event.completed_at,
duration_ms: turn_aborted_event.duration_ms,
},
analytics_events_client,
outgoing,
)
.await;
@@ -2897,6 +2914,7 @@ mod tests {
use codex_app_server_protocol::GuardianApprovalReviewStatus;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::TurnPlanStepStatus;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_protocol::items::HookPromptFragment;
use codex_protocol::items::build_hook_prompt_message;
@@ -3018,6 +3036,7 @@ mod tests {
outgoing: ThreadScopedOutgoingMessageSender,
thread_state: Arc<Mutex<ThreadState>>,
thread_watch_manager: ThreadWatchManager,
analytics_events_client: AnalyticsEventsClient,
codex_home: PathBuf,
}
@@ -3032,6 +3051,7 @@ mod tests {
self.conversation_id,
self.conversation.clone(),
self.thread_manager.clone(),
Some(self.analytics_events_client.clone()),
self.outgoing.clone(),
self.thread_state.clone(),
self.thread_watch_manager.clone(),
@@ -3340,6 +3360,13 @@ mod tests {
outgoing: outgoing.clone(),
thread_state: thread_state.clone(),
thread_watch_manager: thread_watch_manager.clone(),
analytics_events_client: AnalyticsEventsClient::new(
AuthManager::from_auth_for_testing(
CodexAuth::create_dummy_chatgpt_auth_for_testing(),
),
"http://localhost".to_string(),
Some(false),
),
codex_home: codex_home.path().to_path_buf(),
};
@@ -3748,6 +3775,7 @@ mod tests {
conversation_id,
event_turn_id.clone(),
turn_complete_event(&event_turn_id),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -3796,6 +3824,7 @@ mod tests {
conversation_id,
event_turn_id.clone(),
turn_aborted_event(&event_turn_id),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -3843,6 +3872,7 @@ mod tests {
conversation_id,
event_turn_id.clone(),
turn_complete_event(&event_turn_id),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -4109,6 +4139,7 @@ mod tests {
conversation_a,
a_turn1.clone(),
turn_complete_event(&a_turn1),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -4130,6 +4161,7 @@ mod tests {
conversation_b,
b_turn1.clone(),
turn_complete_event(&b_turn1),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -4141,6 +4173,7 @@ mod tests {
conversation_a,
a_turn2.clone(),
turn_complete_event(&a_turn2),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)

View File

@@ -21,6 +21,9 @@ use chrono::DateTime;
use chrono::SecondsFormat;
use chrono::Utc;
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::AnalyticsJsonRpcError;
use codex_analytics::InputError;
use codex_analytics::TurnSteerRequestError;
use codex_app_server_protocol::Account;
use codex_app_server_protocol::AccountLoginCompletedNotification;
use codex_app_server_protocol::AccountUpdatedNotification;
@@ -35,7 +38,7 @@ use codex_app_server_protocol::CancelLoginAccountResponse;
use codex_app_server_protocol::CancelLoginAccountStatus;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::CodexErrorInfo as AppServerCodexErrorInfo;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::CollaborationModeListParams;
use codex_app_server_protocol::CollaborationModeListResponse;
use codex_app_server_protocol::CommandExecParams;
@@ -497,6 +500,22 @@ impl CodexMessageProcessor {
}
}
fn track_error_response(
&self,
request_id: &ConnectionRequestId,
error: &JSONRPCErrorError,
error_type: Option<AnalyticsJsonRpcError>,
) {
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_error_response(
request_id.connection_id.0,
request_id.request_id.clone(),
error.clone(),
error_type,
);
}
}
async fn load_thread(
&self,
thread_id: &str,
@@ -6566,12 +6585,18 @@ impl CodexMessageProcessor {
app_server_client_version: Option<String>,
) {
if let Err(error) = Self::validate_v2_input_limit(&params.input) {
self.track_error_response(
&request_id,
&error,
Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)),
);
self.outgoing.send_error(request_id, error).await;
return;
}
let (_, thread) = match self.load_thread(&params.thread_id).await {
Ok(v) => v,
Err(error) => {
self.track_error_response(&request_id, &error, None);
self.outgoing.send_error(request_id, error).await;
return;
}
@@ -6583,6 +6608,7 @@ impl CodexMessageProcessor {
)
.await
{
self.track_error_response(&request_id, &error, None);
self.outgoing.send_error(request_id, error).await;
return;
}
@@ -6665,6 +6691,15 @@ impl CodexMessageProcessor {
};
let response = TurnStartResponse { turn };
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::TurnStart {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
}
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
@@ -6673,6 +6708,7 @@ impl CodexMessageProcessor {
message: format!("failed to start turn: {err}"),
data: None,
};
self.track_error_response(&request_id, &error, None);
self.outgoing.send_error(request_id, error).await;
}
}
@@ -6697,23 +6733,31 @@ impl CodexMessageProcessor {
let (_, thread) = match self.load_thread(&params.thread_id).await {
Ok(v) => v,
Err(error) => {
self.track_error_response(&request_id, &error, None);
self.outgoing.send_error(request_id, error).await;
return;
}
};
if params.expected_turn_id.is_empty() {
self.send_invalid_request_error(
request_id,
"expectedTurnId must not be empty".to_string(),
)
.await;
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "expectedTurnId must not be empty".to_string(),
data: None,
};
self.track_error_response(&request_id, &error, None);
self.outgoing.send_error(request_id, error).await;
return;
}
self.outgoing
.record_request_turn_id(&request_id, &params.expected_turn_id)
.await;
if let Err(error) = Self::validate_v2_input_limit(&params.input) {
self.track_error_response(
&request_id,
&error,
Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)),
);
self.outgoing.send_error(request_id, error).await;
return;
}
@@ -6730,19 +6774,34 @@ impl CodexMessageProcessor {
{
Ok(turn_id) => {
let response = TurnSteerResponse { turn_id };
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::TurnSteer {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
}
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let (code, message, data) = match err {
let (code, message, data, error_type) = match err {
SteerInputError::NoActiveTurn(_) => (
INVALID_REQUEST_ERROR_CODE,
"no active turn to steer".to_string(),
None,
Some(AnalyticsJsonRpcError::TurnSteer(
TurnSteerRequestError::NoActiveTurn,
)),
),
SteerInputError::ExpectedTurnMismatch { expected, actual } => (
INVALID_REQUEST_ERROR_CODE,
format!("expected active turn id `{expected}` but found `{actual}`"),
None,
Some(AnalyticsJsonRpcError::TurnSteer(
TurnSteerRequestError::ExpectedTurnMismatch,
)),
),
SteerInputError::ActiveTurnNotSteerable { turn_kind } => {
let message = match turn_kind {
@@ -6755,11 +6814,9 @@ impl CodexMessageProcessor {
};
let error = TurnError {
message: message.clone(),
codex_error_info: Some(
AppServerCodexErrorInfo::ActiveTurnNotSteerable {
turn_kind: turn_kind.into(),
},
),
codex_error_info: Some(CodexErrorInfo::ActiveTurnNotSteerable {
turn_kind: turn_kind.into(),
}),
additional_details: None,
};
let data = match serde_json::to_value(error) {
@@ -6772,12 +6829,22 @@ impl CodexMessageProcessor {
None
}
};
(INVALID_REQUEST_ERROR_CODE, message, data)
(
INVALID_REQUEST_ERROR_CODE,
message,
data,
Some(AnalyticsJsonRpcError::TurnSteer(
TurnSteerRequestError::ActiveTurnNotSteerable {
turn_kind: turn_kind.into(),
},
)),
)
}
SteerInputError::EmptyInput => (
INVALID_REQUEST_ERROR_CODE,
"input must not be empty".to_string(),
None,
Some(AnalyticsJsonRpcError::Input(InputError::Empty)),
),
};
let error = JSONRPCErrorError {
@@ -6785,6 +6852,7 @@ impl CodexMessageProcessor {
message,
data,
};
self.track_error_response(&request_id, &error, error_type);
self.outgoing.send_error(request_id, error).await;
}
}
@@ -7449,6 +7517,9 @@ impl CodexMessageProcessor {
conversation_id,
conversation.clone(),
thread_manager.clone(),
listener_task_context
.general_analytics_enabled
.then(|| listener_task_context.analytics_events_client.clone()),
thread_outgoing,
thread_state.clone(),
thread_watch_manager.clone(),

View File

@@ -223,6 +223,11 @@ impl MessageProcessor {
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
outgoing: outgoing.clone(),
}));
let analytics_events_client = AnalyticsEventsClient::new(
Arc::clone(&auth_manager),
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
);
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
@@ -233,12 +238,8 @@ impl MessageProcessor {
.enabled(Feature::DefaultModeRequestUserInput),
},
environment_manager,
Some(analytics_events_client.clone()),
));
let analytics_events_client = AnalyticsEventsClient::new(
Arc::clone(&auth_manager),
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
);
thread_manager
.plugins_manager()
.set_analytics_events_client(analytics_events_client.clone());
@@ -676,6 +677,16 @@ impl MessageProcessor {
self.outgoing.send_error(connection_request_id, error).await;
return;
}
if self.config.features.enabled(Feature::GeneralAnalytics)
&& let ClientRequest::TurnStart { request_id, .. }
| ClientRequest::TurnSteer { request_id, .. } = &codex_request
{
self.analytics_events_client.track_request(
connection_id.0,
request_id.clone(),
codex_request.clone(),
);
}
match codex_request {
ClientRequest::ConfigRead { request_id, params } => {

View File

@@ -78,3 +78,31 @@ model_provider = "{model_provider_id}"
),
)
}
pub fn write_mock_responses_config_toml_with_chatgpt_base_url(
codex_home: &Path,
server_uri: &str,
chatgpt_base_url: &str,
) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
chatgpt_base_url = "{chatgpt_base_url}"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -14,6 +14,7 @@ pub use auth_fixtures::encode_id_token;
pub use auth_fixtures::write_chatgpt_auth;
use codex_app_server_protocol::JSONRPCResponse;
pub use config::write_mock_responses_config_toml;
pub use config::write_mock_responses_config_toml_with_chatgpt_base_url;
pub use core_test_support::format_with_current_shell;
pub use core_test_support::format_with_current_shell_display;
pub use core_test_support::format_with_current_shell_display_non_login;

View File

@@ -80,6 +80,24 @@ async fn app_server_default_analytics_enabled_with_flag() -> Result<()> {
}
pub(crate) async fn enable_analytics_capture(server: &MockServer, codex_home: &Path) -> Result<()> {
let config_path = codex_home.join("config.toml");
let config_toml = std::fs::read_to_string(&config_path)?;
if !config_toml.contains("[features]") {
std::fs::write(
&config_path,
format!("{config_toml}\n[features]\ngeneral_analytics = true\n"),
)?;
} else if !config_toml.contains("general_analytics") {
std::fs::write(
&config_path,
config_toml.replace("[features]\n", "[features]\ngeneral_analytics = true\n"),
)?;
}
mount_analytics_capture(server, codex_home).await
}
pub(crate) async fn mount_analytics_capture(server: &MockServer, codex_home: &Path) -> Result<()> {
Mock::given(method("POST"))
.and(path("/codex/analytics-events/events"))
.respond_with(ResponseTemplate::new(200))
@@ -120,6 +138,41 @@ pub(crate) async fn wait_for_analytics_payload(
serde_json::from_slice(&body).map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"))
}
pub(crate) async fn wait_for_analytics_event(
server: &MockServer,
read_timeout: Duration,
event_type: &str,
) -> Result<Value> {
timeout(read_timeout, async {
loop {
let Some(requests) = server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
for request in &requests {
if request.method != "POST"
|| request.url.path() != "/codex/analytics-events/events"
{
continue;
}
let payload: Value = serde_json::from_slice(&request.body)
.map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"))?;
let Some(events) = payload["events"].as_array() else {
continue;
};
if let Some(event) = events
.iter()
.find(|event| event["event_type"] == event_type)
{
return Ok::<Value, anyhow::Error>(event.clone());
}
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?
}
pub(crate) fn thread_initialized_event(payload: &Value) -> Result<&Value> {
let events = payload["events"]
.as_array()

View File

@@ -39,7 +39,7 @@ use wiremock::matchers::method;
use wiremock::matchers::path;
use super::analytics::assert_basic_thread_initialized_event;
use super::analytics::enable_analytics_capture;
use super::analytics::mount_analytics_capture;
use super::analytics::thread_initialized_event;
use super::analytics::wait_for_analytics_payload;
@@ -173,7 +173,7 @@ async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> {
&server.uri(),
/*general_analytics_enabled*/ true,
)?;
enable_analytics_capture(&server, codex_home.path()).await?;
mount_analytics_capture(&server, codex_home.path()).await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -206,7 +206,7 @@ async fn thread_start_does_not_track_thread_initialized_analytics_without_featur
&server.uri(),
/*general_analytics_enabled*/ false,
)?;
enable_analytics_capture(&server, codex_home.path()).await?;
mount_analytics_capture(&server, codex_home.path()).await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

View File

@@ -3,6 +3,7 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCNotification;
@@ -43,14 +44,15 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
std::fs::create_dir(&working_directory)?;
// Mock server: long-running shell command then (after abort) nothing else needed.
let server = create_mock_responses_server_sequence(vec![create_shell_command_sse_response(
shell_command.clone(),
Some(&working_directory),
Some(10_000),
"call_sleep",
)?])
.await;
create_config_toml(&codex_home, &server.uri(), "never", "danger-full-access")?;
let server =
create_mock_responses_server_sequence_unchecked(vec![create_shell_command_sse_response(
shell_command.clone(),
Some(&working_directory),
Some(10_000),
"call_sleep",
)?])
.await;
create_config_toml(&codex_home, &server.uri(), "never", "workspace-write")?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -87,6 +89,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
let turn_id = turn.id.clone();
// Give the command a brief moment to start.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
@@ -96,7 +99,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
let interrupt_id = mcp
.send_turn_interrupt_request(TurnInterruptParams {
thread_id: thread_id.clone(),
turn_id: turn.id,
turn_id: turn_id.clone(),
})
.await?;
let interrupt_resp: JSONRPCResponse = timeout(

View File

@@ -1,4 +1,5 @@
use anyhow::Result;
use app_test_support::DEFAULT_CLIENT_NAME;
use app_test_support::McpProcess;
use app_test_support::create_apply_patch_sse_response;
use app_test_support::create_exec_command_sse_response;
@@ -9,6 +10,7 @@ use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::format_with_current_shell_display;
use app_test_support::to_response;
use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url;
use codex_app_server::INPUT_TOO_LARGE_ERROR_CODE;
use codex_app_server::INVALID_PARAMS_ERROR_CODE;
use codex_app_server_protocol::ByteRange;
@@ -64,6 +66,10 @@ use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
use super::analytics::enable_analytics_capture;
use super::analytics::wait_for_analytics_event;
use super::analytics::wait_for_analytics_payload;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
#[cfg(not(windows))]
@@ -238,6 +244,158 @@ async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn turn_start_tracks_turn_event_analytics() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml_with_chatgpt_base_url(
codex_home.path(),
&server.uri(),
&server.uri(),
)?;
enable_analytics_capture(&server, codex_home.path()).await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Image {
url: "https://example.com/a.png".to_string(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let event = wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_event").await?;
assert_eq!(event["event_params"]["thread_id"], thread.id);
assert_eq!(event["event_params"]["turn_id"], turn.id);
assert_eq!(
event["event_params"]["app_server_client"]["product_client_id"],
DEFAULT_CLIENT_NAME
);
assert_eq!(event["event_params"]["model"], "mock-model");
assert_eq!(event["event_params"]["model_provider"], "mock_provider");
assert_eq!(event["event_params"]["sandbox_policy"], "read_only");
assert_eq!(event["event_params"]["ephemeral"], false);
assert_eq!(event["event_params"]["thread_source"], "user");
assert_eq!(event["event_params"]["initialization_mode"], "new");
assert_eq!(
event["event_params"]["subagent_source"],
serde_json::Value::Null
);
assert_eq!(
event["event_params"]["parent_thread_id"],
serde_json::Value::Null
);
assert_eq!(event["event_params"]["num_input_images"], 1);
assert_eq!(event["event_params"]["status"], "completed");
assert!(event["event_params"]["started_at"].as_u64().is_some());
assert!(event["event_params"]["completed_at"].as_u64().is_some());
assert!(event["event_params"]["duration_ms"].as_u64().is_some());
assert_eq!(event["event_params"]["input_tokens"], 0);
assert_eq!(event["event_params"]["cached_input_tokens"], 0);
assert_eq!(event["event_params"]["output_tokens"], 0);
assert_eq!(event["event_params"]["reasoning_output_tokens"], 0);
assert_eq!(event["event_params"]["total_tokens"], 0);
Ok(())
}
#[tokio::test]
async fn turn_start_does_not_track_turn_event_analytics_without_feature() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml_with_chatgpt_base_url(
codex_home.path(),
&server.uri(),
&server.uri(),
)?;
enable_analytics_capture(&server, codex_home.path()).await?;
let config_path = codex_home.path().join("config.toml");
let config_toml = std::fs::read_to_string(&config_path)?;
std::fs::write(
&config_path,
config_toml.replace("general_analytics = true", "general_analytics = false"),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
input: vec![V2UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let _ = to_response::<TurnStartResponse>(turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let payload = wait_for_analytics_payload(&server, std::time::Duration::from_millis(250)).await;
assert!(
payload.is_err(),
"turn analytics should be gated off when general_analytics is disabled"
);
Ok(())
}
#[tokio::test]
async fn turn_start_accepts_text_at_limit_with_mention_item() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];

View File

@@ -6,6 +6,7 @@ use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::to_response;
use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url;
use codex_app_server::INPUT_TOO_LARGE_ERROR_CODE;
use codex_app_server::INVALID_PARAMS_ERROR_CODE;
use codex_app_server_protocol::JSONRPCError;
@@ -23,6 +24,9 @@ use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS;
use tempfile::TempDir;
use tokio::time::timeout;
use super::analytics::enable_analytics_capture;
use super::analytics::wait_for_analytics_event;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
@@ -32,7 +36,12 @@ async fn turn_steer_requires_active_turn() -> Result<()> {
std::fs::create_dir(&codex_home)?;
let server = create_mock_responses_server_sequence(vec![]).await;
create_config_toml(&codex_home, &server.uri())?;
write_mock_responses_config_toml_with_chatgpt_base_url(
&codex_home,
&server.uri(),
&server.uri(),
)?;
enable_analytics_capture(&server, &codex_home).await?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -52,7 +61,7 @@ async fn turn_steer_requires_active_turn() -> Result<()> {
let steer_req = mcp
.send_turn_steer_request(TurnSteerParams {
thread_id: thread.id,
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "steer".to_string(),
text_elements: Vec::new(),
@@ -67,6 +76,21 @@ async fn turn_steer_requires_active_turn() -> Result<()> {
.await??;
assert_eq!(steer_err.error.code, -32600);
let event =
wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_steer_event").await?;
assert_eq!(event["event_params"]["thread_id"], thread.id);
assert_eq!(event["event_params"]["result"], "rejected");
assert_eq!(event["event_params"]["num_input_images"], 0);
assert_eq!(
event["event_params"]["expected_turn_id"],
"turn-does-not-exist"
);
assert_eq!(
event["event_params"]["accepted_turn_id"],
serde_json::Value::Null
);
assert_eq!(event["event_params"]["rejection_reason"], "no_active_turn");
Ok(())
}
@@ -95,7 +119,12 @@ async fn turn_steer_rejects_oversized_text_input() -> Result<()> {
"call_sleep",
)?])
.await;
create_config_toml(&codex_home, &server.uri())?;
write_mock_responses_config_toml_with_chatgpt_base_url(
&codex_home,
&server.uri(),
&server.uri(),
)?;
enable_analytics_capture(&server, &codex_home).await?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -198,7 +227,12 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> {
"call_sleep",
)?])
.await;
create_config_toml(&codex_home, &server.uri())?;
write_mock_responses_config_toml_with_chatgpt_base_url(
&codex_home,
&server.uri(),
&server.uri(),
)?;
enable_analytics_capture(&server, &codex_home).await?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -258,31 +292,20 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> {
let steer: TurnSteerResponse = to_response::<TurnSteerResponse>(steer_resp)?;
assert_eq!(steer.turn_id, turn.id);
let event =
wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_steer_event").await?;
assert_eq!(event["event_params"]["thread_id"], thread.id);
assert_eq!(event["event_params"]["result"], "accepted");
assert_eq!(event["event_params"]["num_input_images"], 0);
assert_eq!(event["event_params"]["expected_turn_id"], turn.id);
assert_eq!(event["event_params"]["accepted_turn_id"], turn.id);
assert_eq!(
event["event_params"]["rejection_reason"],
serde_json::Value::Null
);
mcp.interrupt_turn_and_wait_for_aborted(thread.id, steer.turn_id, DEFAULT_READ_TIMEOUT)
.await?;
Ok(())
}
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -52,6 +52,8 @@ use codex_analytics::AnalyticsEventsClient;
use codex_analytics::AppInvocation;
use codex_analytics::InvocationType;
use codex_analytics::SubAgentThreadStartedInput;
use codex_analytics::ThreadInitializationMode;
use codex_analytics::TurnResolvedConfigFact;
use codex_analytics::build_track_events_context;
use codex_app_server_protocol::McpServerElicitationRequest;
use codex_app_server_protocol::McpServerElicitationRequestParams;
@@ -186,6 +188,7 @@ use crate::config::resolve_web_search_mode_for_turn;
use crate::context_manager::ContextManager;
use crate::context_manager::TotalTokenUsageBreakdown;
use crate::environment_context::EnvironmentContext;
use crate::thread_rollout_truncation::initial_history_has_prior_user_turns;
use codex_config::CONFIG_TOML_FILE;
use codex_config::types::McpServerConfig;
use codex_config::types::ShellEnvironmentPolicy;
@@ -428,6 +431,7 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) inherited_exec_policy: Option<Arc<ExecPolicyManager>>,
pub(crate) user_shell_override: Option<shell::Shell>,
pub(crate) parent_trace: Option<W3cTraceContext>,
pub(crate) analytics_events_client: Option<AnalyticsEventsClient>,
}
pub(crate) const INITIAL_SUBMIT_ID: &str = "";
@@ -482,6 +486,7 @@ impl Codex {
user_shell_override,
inherited_exec_policy,
parent_trace: _,
analytics_events_client,
} = args;
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();
@@ -646,6 +651,7 @@ impl Codex {
app_server_client_name: None,
app_server_client_version: None,
session_source,
thread_initialization_mode: ThreadInitializationMode::New,
dynamic_tools,
persist_extended_history,
inherited_shell_snapshot,
@@ -672,6 +678,7 @@ impl Codex {
skills_watcher,
agent_control,
environment,
analytics_events_client,
)
.await
.map_err(|e| {
@@ -1070,6 +1077,14 @@ impl TurnContext {
}
}
fn thread_initialization_mode(initial_history: &InitialHistory) -> ThreadInitializationMode {
match initial_history {
InitialHistory::New => ThreadInitializationMode::New,
InitialHistory::Forked(_) => ThreadInitializationMode::Forked,
InitialHistory::Resumed(_) => ThreadInitializationMode::Resumed,
}
}
fn local_time_context() -> (String, String) {
match iana_time_zone::get_timezone() {
Ok(timezone) => (Local::now().format("%Y-%m-%d").to_string(), timezone),
@@ -1131,6 +1146,7 @@ pub(crate) struct SessionConfiguration {
app_server_client_version: Option<String>,
/// Source of the session (cli, vscode, exec, mcp, ...)
session_source: SessionSource,
thread_initialization_mode: ThreadInitializationMode,
dynamic_tools: Vec<DynamicToolSpec>,
persist_extended_history: bool,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
@@ -1155,6 +1171,7 @@ impl SessionConfiguration {
reasoning_effort: self.collaboration_mode.reasoning_effort(),
personality: self.personality,
session_source: self.session_source.clone(),
initialization_mode: self.thread_initialization_mode,
}
}
@@ -1521,12 +1538,15 @@ impl Session {
skills_watcher: Arc<SkillsWatcher>,
agent_control: AgentControl,
environment: Option<Arc<Environment>>,
analytics_events_client: Option<AnalyticsEventsClient>,
) -> anyhow::Result<Arc<Self>> {
debug!(
"Configuring session: model={}; provider={:?}",
session_configuration.collaboration_mode.model(),
session_configuration.provider
);
session_configuration.thread_initialization_mode =
thread_initialization_mode(&initial_history);
let forked_from_id = initial_history.forked_from_id();
let (conversation_id, rollout_params) = match &initial_history {
@@ -1925,11 +1945,13 @@ impl Session {
),
shell_zsh_path: config.zsh_path.clone(),
main_execve_wrapper_exe: config.main_execve_wrapper_exe.clone(),
analytics_events_client: AnalyticsEventsClient::new(
Arc::clone(&auth_manager),
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
),
analytics_events_client: analytics_events_client.unwrap_or_else(|| {
AnalyticsEventsClient::new(
Arc::clone(&auth_manager),
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
)
}),
hooks,
rollout: Mutex::new(rollout_recorder),
user_shell: Arc::new(default_shell),
@@ -2244,6 +2266,11 @@ impl Session {
SessionSource::SubAgent(_)
)
};
let has_prior_user_turns = initial_history_has_prior_user_turns(&conversation_history);
{
let mut state = self.state.lock().await;
state.set_next_turn_is_first(!has_prior_user_turns);
}
match conversation_history {
InitialHistory::New => {
// Defer initial context insertion until the first real turn starts so
@@ -6012,6 +6039,8 @@ pub(crate) async fn run_turn(
.await;
}
track_turn_resolved_config_analytics(&sess, &turn_context, &input).await;
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token())
.await;
@@ -6298,6 +6327,53 @@ pub(crate) async fn run_turn(
last_agent_message
}
async fn track_turn_resolved_config_analytics(
sess: &Session,
turn_context: &TurnContext,
input: &[UserInput],
) {
if !sess.enabled(Feature::GeneralAnalytics) {
return;
}
let thread_config = {
let state = sess.state.lock().await;
state.session_configuration.thread_config_snapshot()
};
let is_first_turn = {
let mut state = sess.state.lock().await;
state.take_next_turn_is_first()
};
sess.services
.analytics_events_client
.track_turn_resolved_config(TurnResolvedConfigFact {
turn_id: turn_context.sub_id.clone(),
thread_id: sess.conversation_id.to_string(),
num_input_images: input
.iter()
.filter(|item| {
matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. })
})
.count(),
submission_type: None,
ephemeral: thread_config.ephemeral,
session_source: thread_config.session_source,
initialization_mode: thread_config.initialization_mode,
model: turn_context.model_info.slug.clone(),
model_provider: turn_context.config.model_provider_id.clone(),
sandbox_policy: turn_context.sandbox_policy.get().clone(),
reasoning_effort: turn_context.reasoning_effort,
reasoning_summary: Some(turn_context.reasoning_summary),
service_tier: turn_context.config.service_tier,
approval_policy: turn_context.approval_policy.value(),
approvals_reviewer: turn_context.config.approvals_reviewer,
sandbox_network_access: turn_context.network_sandbox_policy.is_enabled(),
collaboration_mode: turn_context.collaboration_mode.mode,
personality: turn_context.personality,
is_first_turn,
});
}
async fn run_pre_sampling_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,

View File

@@ -95,6 +95,7 @@ pub(crate) async fn run_codex_thread_interactive(
user_shell_override: None,
inherited_exec_policy: Some(Arc::clone(&parent_session.services.exec_policy)),
parent_trace: None,
analytics_events_client: Some(parent_session.services.analytics_events_client.clone()),
})
.await?;
if parent_session.enabled(codex_features::Feature::GeneralAnalytics) {

View File

@@ -13,6 +13,7 @@ use crate::function_tool::FunctionCallError;
use crate::shell::default_user_shell;
use crate::tools::format_exec_output_str;
use codex_analytics::ThreadInitializationMode;
use codex_features::Features;
use codex_login::CodexAuth;
use codex_mcp::ToolInfo;
@@ -1872,6 +1873,7 @@ async fn set_rate_limits_retains_previous_credits() {
app_server_client_name: None,
app_server_client_version: None,
session_source: SessionSource::Exec,
thread_initialization_mode: ThreadInitializationMode::New,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
@@ -1974,6 +1976,7 @@ async fn set_rate_limits_updates_plan_type_when_present() {
app_server_client_name: None,
app_server_client_version: None,
session_source: SessionSource::Exec,
thread_initialization_mode: ThreadInitializationMode::New,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
@@ -2323,6 +2326,7 @@ pub(crate) async fn make_session_configuration_for_tests() -> SessionConfigurati
app_server_client_name: None,
app_server_client_version: None,
session_source: SessionSource::Exec,
thread_initialization_mode: ThreadInitializationMode::New,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
@@ -2586,6 +2590,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
app_server_client_name: None,
app_server_client_version: None,
session_source: SessionSource::Exec,
thread_initialization_mode: ThreadInitializationMode::New,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
@@ -2620,6 +2625,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
.await
.expect("create environment"),
)),
/*analytics_events_client*/ None,
)
.await;
@@ -2689,6 +2695,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
app_server_client_name: None,
app_server_client_version: None,
session_source: SessionSource::Exec,
thread_initialization_mode: ThreadInitializationMode::New,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
@@ -3530,6 +3537,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
app_server_client_name: None,
app_server_client_version: None,
session_source: SessionSource::Exec,
thread_initialization_mode: ThreadInitializationMode::New,
dynamic_tools,
persist_extended_history: false,
inherited_shell_snapshot: None,

View File

@@ -457,6 +457,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
inherited_exec_policy: Some(Arc::new(parent_exec_policy)),
user_shell_override: None,
parent_trace: None,
analytics_events_client: None,
})
.await
.expect("spawn guardian subagent");

View File

@@ -3,6 +3,7 @@ use crate::codex::Codex;
use crate::codex::SteerInputError;
use crate::config::ConstraintResult;
use crate::file_watcher::WatchRegistration;
use codex_analytics::ThreadInitializationMode;
use codex_features::Feature;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::config_types::Personality;
@@ -42,6 +43,7 @@ pub struct ThreadConfigSnapshot {
pub reasoning_effort: Option<ReasoningEffort>,
pub personality: Option<Personality>,
pub session_source: SessionSource,
pub initialization_mode: ThreadInitializationMode,
}
pub struct CodexThread {

View File

@@ -39,6 +39,7 @@ pub async fn build_prompt_input(
.enabled(Feature::DefaultModeRequestUserInput),
},
Arc::new(EnvironmentManager::from_env()),
/*analytics_events_client*/ None,
);
let thread = thread_manager.start_thread(config).await?;

View File

@@ -33,6 +33,7 @@ pub(crate) struct SessionState {
pub(crate) active_connector_selection: HashSet<String>,
pub(crate) pending_session_start_source: Option<codex_hooks::SessionStartSource>,
granted_permissions: Option<PermissionProfile>,
next_turn_is_first: bool,
}
impl SessionState {
@@ -51,6 +52,7 @@ impl SessionState {
active_connector_selection: HashSet::new(),
pending_session_start_source: None,
granted_permissions: None,
next_turn_is_first: true,
}
}
@@ -73,6 +75,16 @@ impl SessionState {
self.previous_turn_settings = previous_turn_settings;
}
pub(crate) fn set_next_turn_is_first(&mut self, value: bool) {
self.next_turn_is_first = value;
}
pub(crate) fn take_next_turn_is_first(&mut self) -> bool {
let is_first_turn = self.next_turn_is_first;
self.next_turn_is_first = false;
is_first_turn
}
pub(crate) fn clone_history(&self) -> ContextManager {
self.history.clone()
}

View File

@@ -30,6 +30,7 @@ use crate::hook_runtime::record_pending_input;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use codex_analytics::TurnTokenUsageFact;
use codex_login::AuthManager;
use codex_models_manager::manager::ModelsManager;
use codex_otel::SessionTelemetry;
@@ -485,6 +486,13 @@ impl Session {
- token_usage_at_turn_start.total_tokens)
.max(0),
};
self.services
.analytics_events_client
.track_turn_token_usage(TurnTokenUsageFact {
turn_id: turn_context.sub_id.clone(),
thread_id: self.conversation_id.to_string(),
token_usage: turn_token_usage.clone(),
});
self.services.session_telemetry.histogram(
TURN_TOKEN_USAGE_METRIC,
turn_token_usage.total_tokens,

View File

@@ -15,6 +15,7 @@ use crate::shell_snapshot::ShellSnapshot;
use crate::skills_watcher::SkillsWatcher;
use crate::skills_watcher::SkillsWatcherEvent;
use crate::tasks::interrupted_turn_history_marker;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::ThreadHistoryBuilder;
use codex_app_server_protocol::TurnStatus;
use codex_exec_server::EnvironmentManager;
@@ -210,6 +211,7 @@ pub(crate) struct ThreadManagerState {
mcp_manager: Arc<McpManager>,
skills_watcher: Arc<SkillsWatcher>,
session_source: SessionSource,
analytics_events_client: Option<AnalyticsEventsClient>,
// Captures submitted ops for testing purpose when test mode is enabled.
ops_log: Option<SharedCapturedOps>,
}
@@ -221,6 +223,7 @@ impl ThreadManager {
session_source: SessionSource,
collaboration_modes_config: CollaborationModesConfig,
environment_manager: Arc<EnvironmentManager>,
analytics_events_client: Option<AnalyticsEventsClient>,
) -> Self {
let codex_home = config.codex_home.clone();
let restriction_product = session_source.restriction_product();
@@ -259,6 +262,7 @@ impl ThreadManager {
skills_watcher,
auth_manager,
session_source,
analytics_events_client,
ops_log: should_use_test_thread_manager_behavior()
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
}),
@@ -328,6 +332,7 @@ impl ThreadManager {
skills_watcher,
auth_manager,
session_source: SessionSource::Exec,
analytics_events_client: None,
ops_log: should_use_test_thread_manager_behavior()
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
}),
@@ -916,6 +921,7 @@ impl ThreadManagerState {
inherited_exec_policy,
user_shell_override,
parent_trace,
analytics_events_client: self.analytics_events_client.clone(),
})
.await?;
self.finalize_thread_spawn(codex, thread_id, watch_registration)

View File

@@ -299,6 +299,7 @@ async fn new_uses_configured_openai_provider_for_model_refresh() {
Arc::new(codex_exec_server::EnvironmentManager::new(
/*exec_server_url*/ None,
)),
/*analytics_events_client*/ None,
);
let _ = manager.list_models(RefreshStrategy::Online).await;
@@ -435,6 +436,7 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor
Arc::new(codex_exec_server::EnvironmentManager::new(
/*exec_server_url*/ None,
)),
/*analytics_events_client*/ None,
);
let source = manager
@@ -537,6 +539,7 @@ async fn interrupted_fork_snapshot_preserves_explicit_turn_id() {
Arc::new(codex_exec_server::EnvironmentManager::new(
/*exec_server_url*/ None,
)),
/*analytics_events_client*/ None,
);
let source = manager
@@ -629,6 +632,7 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_
Arc::new(codex_exec_server::EnvironmentManager::new(
/*exec_server_url*/ None,
)),
/*analytics_events_client*/ None,
);
let source = manager

View File

@@ -8,9 +8,21 @@ use crate::event_mapping;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::RolloutItem;
pub(crate) fn initial_history_has_prior_user_turns(conversation_history: &InitialHistory) -> bool {
conversation_history.scan_rollout_items(rollout_item_is_user_turn_boundary)
}
fn rollout_item_is_user_turn_boundary(item: &RolloutItem) -> bool {
match item {
RolloutItem::ResponseItem(item) => is_user_turn_boundary(item),
_ => false,
}
}
/// Return the indices of user message boundaries in a rollout.
///
/// A user message boundary is a `RolloutItem::ResponseItem(ResponseItem::Message { .. })`

View File

@@ -519,6 +519,7 @@ impl TestCodexBuilder {
SessionSource::Exec,
CollaborationModesConfig::default(),
Arc::clone(&environment_manager),
/*analytics_events_client*/ None,
)
} else {
codex_core::test_support::thread_manager_with_models_provider_and_home(

View File

@@ -1099,6 +1099,7 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
Arc::new(codex_exec_server::EnvironmentManager::new(
/*exec_server_url*/ None,
)),
/*analytics_events_client*/ None,
);
let NewThread { thread: codex, .. } = thread_manager
.start_thread(config)

View File

@@ -70,6 +70,7 @@ impl MessageProcessor {
.enabled(Feature::DefaultModeRequestUserInput),
},
environment_manager,
/*analytics_events_client*/ None,
));
Self {
outgoing,

View File

@@ -2279,6 +2279,14 @@ pub enum InitialHistory {
}
impl InitialHistory {
pub fn scan_rollout_items(&self, mut predicate: impl FnMut(&RolloutItem) -> bool) -> bool {
match self {
InitialHistory::New => false,
InitialHistory::Resumed(resumed) => resumed.history.iter().any(&mut predicate),
InitialHistory::Forked(items) => items.iter().any(predicate),
}
}
pub fn forked_from_id(&self) -> Option<ThreadId> {
match self {
InitialHistory::New => None,