mirror of
https://github.com/openai/codex.git
synced 2026-06-04 04:12:03 +00:00
Compare commits
6 Commits
cooper/cod
...
codex/add-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a0abb0f161 | ||
|
|
9276d1cabd | ||
|
|
babf24a559 | ||
|
|
9441ce2b71 | ||
|
|
7784179e98 | ||
|
|
8c3b21f23d |
@@ -8,6 +8,7 @@ use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AnalyticsJsonRpcError;
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppServerStartedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
use crate::facts::HookRunFact;
|
||||
@@ -169,6 +170,21 @@ impl AnalyticsEventsClient {
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_app_server_started(
|
||||
&self,
|
||||
rpc_transport: AppServerRpcTransport,
|
||||
duration: Duration,
|
||||
) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::AppServerStarted(AppServerStartedInput {
|
||||
runtime: current_runtime_metadata(),
|
||||
rpc_transport,
|
||||
duration_ms: u64::try_from(duration.as_millis()).unwrap_or(u64::MAX),
|
||||
created_at: crate::now_unix_seconds(),
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_guardian_review(
|
||||
&self,
|
||||
tracking: &GuardianReviewTrackContext,
|
||||
|
||||
@@ -43,6 +43,7 @@ use serde::Serialize;
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum AppServerRpcTransport {
|
||||
Stdio,
|
||||
UnixSocket,
|
||||
Websocket,
|
||||
InProcess,
|
||||
}
|
||||
@@ -56,6 +57,7 @@ pub(crate) struct TrackEventsRequest {
|
||||
#[serde(untagged)]
|
||||
pub(crate) enum TrackEventRequest {
|
||||
SkillInvocation(SkillInvocationEventRequest),
|
||||
AppServerStarted(CodexAppServerEventRequest),
|
||||
ThreadInitialized(ThreadInitializedEvent),
|
||||
GuardianReview(Box<GuardianReviewEventRequest>),
|
||||
AppMentioned(CodexAppMentionedEventRequest),
|
||||
@@ -144,6 +146,27 @@ pub(crate) struct CodexRuntimeMetadata {
|
||||
pub(crate) runtime_arch: String,
|
||||
}
|
||||
|
||||
/// Analytics parameters emitted when an app-server runtime starts.
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexAppServerStartedEventParams {
|
||||
pub(crate) runtime: CodexRuntimeMetadata,
|
||||
pub(crate) rpc_transport: AppServerRpcTransport,
|
||||
/// Elapsed measured startup duration, in milliseconds from a monotonic clock.
|
||||
pub(crate) duration_ms: u64,
|
||||
/// Time at which the event was recorded, in seconds since the Unix epoch.
|
||||
pub(crate) created_at: u64,
|
||||
}
|
||||
|
||||
/// Analytics events emitted for app-server lifecycle changes.
|
||||
#[derive(Serialize)]
|
||||
#[serde(tag = "event_type")]
|
||||
pub(crate) enum CodexAppServerEventRequest {
|
||||
#[serde(rename = "codex_app_server_started")]
|
||||
Started {
|
||||
event_params: CodexAppServerStartedEventParams,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct ThreadInitializedEventParams {
|
||||
pub(crate) thread_id: String,
|
||||
|
||||
@@ -324,6 +324,7 @@ pub(crate) enum AnalyticsFact {
|
||||
}
|
||||
|
||||
pub(crate) enum CustomAnalyticsFact {
|
||||
AppServerStarted(AppServerStartedInput),
|
||||
SubAgentThreadStarted(SubAgentThreadStartedInput),
|
||||
Compaction(Box<CodexCompactionEvent>),
|
||||
GuardianReview(Box<GuardianReviewEventParams>),
|
||||
@@ -337,6 +338,16 @@ pub(crate) enum CustomAnalyticsFact {
|
||||
PluginStateChanged(PluginStateChangedInput),
|
||||
}
|
||||
|
||||
/// Analytics input captured when an app-server runtime starts.
|
||||
pub(crate) struct AppServerStartedInput {
|
||||
pub runtime: CodexRuntimeMetadata,
|
||||
pub rpc_transport: AppServerRpcTransport,
|
||||
/// Elapsed measured startup duration, in milliseconds from a monotonic clock.
|
||||
pub duration_ms: u64,
|
||||
/// Time at which the event was recorded, in seconds since the Unix epoch.
|
||||
pub created_at: u64,
|
||||
}
|
||||
|
||||
pub(crate) struct SkillInvokedInput {
|
||||
pub tracking: TrackEventsContext,
|
||||
pub invocations: Vec<SkillInvocation>,
|
||||
|
||||
@@ -5,6 +5,8 @@ use crate::accepted_lines::accepted_line_repo_hash_for_cwd;
|
||||
use crate::events::AppServerRpcTransport;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
use crate::events::CodexAppServerClientMetadata;
|
||||
use crate::events::CodexAppServerEventRequest;
|
||||
use crate::events::CodexAppServerStartedEventParams;
|
||||
use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexCollabAgentToolCallEventParams;
|
||||
use crate::events::CodexCollabAgentToolCallEventRequest;
|
||||
@@ -60,6 +62,7 @@ use crate::events::subagent_thread_started_event_request;
|
||||
use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AnalyticsJsonRpcError;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppServerStartedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CodexCompactionEvent;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
@@ -446,6 +449,9 @@ impl AnalyticsReducer {
|
||||
self.ingest_server_request_aborted(completed_at_ms, request_id, out);
|
||||
}
|
||||
AnalyticsFact::Custom(input) => match input {
|
||||
CustomAnalyticsFact::AppServerStarted(input) => {
|
||||
self.ingest_app_server_started(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
|
||||
self.ingest_subagent_thread_started(input, out);
|
||||
}
|
||||
@@ -483,6 +489,23 @@ impl AnalyticsReducer {
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest_app_server_started(
|
||||
&mut self,
|
||||
input: AppServerStartedInput,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
out.push(TrackEventRequest::AppServerStarted(
|
||||
CodexAppServerEventRequest::Started {
|
||||
event_params: CodexAppServerStartedEventParams {
|
||||
runtime: input.runtime,
|
||||
rpc_transport: input.rpc_transport,
|
||||
duration_ms: input.duration_ms,
|
||||
created_at: input.created_at,
|
||||
},
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
fn ingest_initialize(
|
||||
&mut self,
|
||||
connection_id: u64,
|
||||
|
||||
@@ -7,10 +7,15 @@ use codex_login::AuthManager;
|
||||
pub(crate) fn analytics_events_client_from_config(
|
||||
auth_manager: Arc<AuthManager>,
|
||||
config: &Config,
|
||||
default_analytics_enabled: bool,
|
||||
) -> AnalyticsEventsClient {
|
||||
AnalyticsEventsClient::new(
|
||||
auth_manager,
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
Some(
|
||||
config
|
||||
.analytics_enabled
|
||||
.unwrap_or(default_analytics_enabled),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -49,6 +49,7 @@ use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::analytics_utils::analytics_events_client_from_config;
|
||||
use crate::config_manager::ConfigManager;
|
||||
@@ -370,6 +371,7 @@ pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle>
|
||||
}
|
||||
|
||||
async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
|
||||
let startup_started = Instant::now();
|
||||
let channel_capacity = args.channel_capacity.max(1);
|
||||
let installation_id = resolve_installation_id(&args.config.codex_home).await?;
|
||||
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
|
||||
@@ -380,8 +382,11 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env)
|
||||
.await;
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref());
|
||||
let analytics_events_client = analytics_events_client_from_config(
|
||||
Arc::clone(&auth_manager),
|
||||
args.config.as_ref(),
|
||||
/*default_analytics_enabled*/ true,
|
||||
);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
@@ -421,6 +426,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
|
||||
);
|
||||
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
|
||||
let mut processor_handle = tokio::spawn(async move {
|
||||
let app_server_started_analytics_events_client = analytics_events_client.clone();
|
||||
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: Arc::clone(&processor_outgoing),
|
||||
analytics_events_client,
|
||||
@@ -442,6 +448,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
let session = Arc::new(ConnectionSessionState::new());
|
||||
let mut listen_for_threads = true;
|
||||
let mut app_server_started_tracked = false;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -479,7 +486,17 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
|
||||
}
|
||||
}
|
||||
Some(ProcessorCommand::Notification(notification)) => {
|
||||
let initialized =
|
||||
matches!(notification, ClientNotification::Initialized);
|
||||
processor.process_client_notification(notification).await;
|
||||
if initialized && !app_server_started_tracked {
|
||||
app_server_started_analytics_events_client
|
||||
.track_app_server_started(
|
||||
AppServerRpcTransport::InProcess,
|
||||
startup_started.elapsed(),
|
||||
);
|
||||
app_server_started_tracked = true;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
break;
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::io::Result as IoResult;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::analytics_utils::analytics_events_client_from_config;
|
||||
use crate::config_manager::ConfigManager;
|
||||
@@ -428,6 +429,7 @@ pub async fn run_main_with_transport_options(
|
||||
auth: AppServerWebsocketAuthSettings,
|
||||
runtime_options: AppServerRuntimeOptions,
|
||||
) -> IoResult<()> {
|
||||
let startup_started = Instant::now();
|
||||
let (transport_event_tx, mut transport_event_rx) =
|
||||
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
|
||||
@@ -787,14 +789,19 @@ pub async fn run_main_with_transport_options(
|
||||
|
||||
let processor_handle = tokio::spawn({
|
||||
let auth_manager = Arc::clone(&auth_manager);
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
|
||||
let analytics_events_client = analytics_events_client_from_config(
|
||||
Arc::clone(&auth_manager),
|
||||
&config,
|
||||
default_analytics_enabled,
|
||||
);
|
||||
let rpc_transport = analytics_rpc_transport(&transport);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
));
|
||||
let initialize_notification_sender = outgoing_message_sender.clone();
|
||||
let outbound_control_tx = outbound_control_tx;
|
||||
analytics_events_client.track_app_server_started(rpc_transport, startup_started.elapsed());
|
||||
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: outgoing_message_sender,
|
||||
analytics_events_client,
|
||||
@@ -809,7 +816,7 @@ pub async fn run_main_with_transport_options(
|
||||
session_source,
|
||||
auth_manager,
|
||||
installation_id,
|
||||
rpc_transport: analytics_rpc_transport(&transport),
|
||||
rpc_transport,
|
||||
remote_control_handle: Some(remote_control_handle.clone()),
|
||||
plugin_startup_tasks: runtime_options.plugin_startup_tasks,
|
||||
}));
|
||||
@@ -1086,15 +1093,19 @@ pub async fn run_main_with_transport_options(
|
||||
fn analytics_rpc_transport(transport: &AppServerTransport) -> AppServerRpcTransport {
|
||||
match transport {
|
||||
AppServerTransport::Stdio => AppServerRpcTransport::Stdio,
|
||||
AppServerTransport::UnixSocket { .. }
|
||||
| AppServerTransport::WebSocket { .. }
|
||||
| AppServerTransport::Off => AppServerRpcTransport::Websocket,
|
||||
AppServerTransport::UnixSocket { .. } => AppServerRpcTransport::UnixSocket,
|
||||
AppServerTransport::WebSocket { .. } | AppServerTransport::Off => {
|
||||
AppServerRpcTransport::Websocket
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::LogFormat;
|
||||
use super::analytics_rpc_transport;
|
||||
use crate::transport::AppServerTransport;
|
||||
use codex_analytics::AppServerRpcTransport;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
@@ -1114,4 +1125,16 @@ mod tests {
|
||||
assert_eq!(LogFormat::from_env_value(Some("text")), LogFormat::Default);
|
||||
assert_eq!(LogFormat::from_env_value(Some("jsonl")), LogFormat::Default);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn analytics_rpc_transport_preserves_unix_socket() {
|
||||
let transport = "unix://codex-app-server.sock"
|
||||
.parse::<AppServerTransport>()
|
||||
.expect("unix socket transport should parse");
|
||||
|
||||
assert!(matches!(
|
||||
analytics_rpc_transport(&transport),
|
||||
AppServerRpcTransport::UnixSocket
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -244,8 +244,11 @@ async fn build_test_processor(
|
||||
Arg0DispatchPaths::default(),
|
||||
Arc::new(codex_config::NoopThreadConfigLoader),
|
||||
);
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), config.as_ref());
|
||||
let analytics_events_client = analytics_events_client_from_config(
|
||||
Arc::clone(&auth_manager),
|
||||
config.as_ref(),
|
||||
/*default_analytics_enabled*/ true,
|
||||
);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
|
||||
@@ -1,15 +1,28 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::ChatGptAuthFixture;
|
||||
use app_test_support::DEFAULT_CLIENT_NAME;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::write_chatgpt_auth;
|
||||
use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url;
|
||||
use codex_app_server::in_process;
|
||||
use codex_app_server::in_process::InProcessStartArgs;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_config::CloudRequirementsLoader;
|
||||
use codex_config::LoaderOverrides;
|
||||
use codex_config::types::AuthCredentialsStoreMode;
|
||||
use codex_config::types::OtelExporterKind;
|
||||
use codex_config::types::OtelHttpProtocol;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
@@ -79,13 +92,161 @@ async fn app_server_default_analytics_enabled_with_flag() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn standalone_app_server_startup_tracks_analytics_event() -> Result<()> {
|
||||
let server = MockServer::start().await;
|
||||
let codex_home = TempDir::new()?;
|
||||
write_mock_responses_config_toml_with_chatgpt_base_url(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
&server.uri(),
|
||||
)?;
|
||||
mount_analytics_capture(&server, codex_home.path()).await?;
|
||||
|
||||
let _mcp = McpProcess::new_without_managed_config(codex_home.path()).await?;
|
||||
let event =
|
||||
wait_for_analytics_event(&server, Duration::from_secs(10), "codex_app_server_started")
|
||||
.await?;
|
||||
|
||||
assert_app_server_started_event(&event, "stdio");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn standalone_app_server_startup_respects_default_disabled_analytics() -> Result<()> {
|
||||
let server = MockServer::start().await;
|
||||
let codex_home = TempDir::new()?;
|
||||
write_mock_responses_config_toml_with_chatgpt_base_url(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
&server.uri(),
|
||||
)?;
|
||||
mount_analytics_endpoint(&server).await;
|
||||
write_analytics_auth(codex_home.path())?;
|
||||
|
||||
let _mcp = McpProcess::new_without_managed_config(codex_home.path()).await?;
|
||||
|
||||
assert_analytics_event_not_received(&server, Duration::from_secs(1), "codex_app_server_started")
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn embedded_app_server_startup_tracks_analytics_event() -> Result<()> {
|
||||
let server = MockServer::start().await;
|
||||
let codex_home = TempDir::new()?;
|
||||
write_mock_responses_config_toml_with_chatgpt_base_url(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
&server.uri(),
|
||||
)?;
|
||||
mount_analytics_capture(&server, codex_home.path()).await?;
|
||||
|
||||
let client =
|
||||
in_process::start(in_process_start_args(codex_home.path(), "codex-tui").await?).await?;
|
||||
|
||||
let event =
|
||||
wait_for_analytics_event(&server, Duration::from_secs(10), "codex_app_server_started")
|
||||
.await?;
|
||||
|
||||
assert_app_server_started_event(&event, "in_process");
|
||||
client.shutdown().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn embedded_app_server_failed_initialize_does_not_track_startup_event() -> Result<()> {
|
||||
let server = MockServer::start().await;
|
||||
let codex_home = TempDir::new()?;
|
||||
write_mock_responses_config_toml_with_chatgpt_base_url(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
&server.uri(),
|
||||
)?;
|
||||
mount_analytics_capture(&server, codex_home.path()).await?;
|
||||
|
||||
let result =
|
||||
in_process::start(in_process_start_args(codex_home.path(), "bad\rname").await?).await;
|
||||
let Err(error) = result else {
|
||||
anyhow::bail!("in-process start should reject an invalid client name");
|
||||
};
|
||||
assert!(
|
||||
error
|
||||
.to_string()
|
||||
.contains("in-process initialize failed: Invalid clientInfo.name")
|
||||
);
|
||||
|
||||
assert_analytics_event_not_received(&server, Duration::from_secs(1), "codex_app_server_started")
|
||||
.await
|
||||
}
|
||||
|
||||
async fn in_process_start_args(codex_home: &Path, client_name: &str) -> Result<InProcessStartArgs> {
|
||||
let loader_overrides = LoaderOverrides::without_managed_config_for_tests();
|
||||
let config = ConfigBuilder::default()
|
||||
.codex_home(codex_home.to_path_buf())
|
||||
.fallback_cwd(Some(codex_home.to_path_buf()))
|
||||
.loader_overrides(loader_overrides.clone())
|
||||
.build()
|
||||
.await?;
|
||||
Ok(InProcessStartArgs {
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
config: Arc::new(config),
|
||||
cli_overrides: Vec::new(),
|
||||
loader_overrides,
|
||||
strict_config: false,
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader),
|
||||
feedback: CodexFeedback::new(),
|
||||
log_db: None,
|
||||
state_db: None,
|
||||
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
|
||||
config_warnings: Vec::new(),
|
||||
session_source: SessionSource::Cli,
|
||||
enable_codex_api_key_env: false,
|
||||
initialize: InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: client_name.to_string(),
|
||||
title: None,
|
||||
version: "0.1.0".to_string(),
|
||||
},
|
||||
capabilities: None,
|
||||
},
|
||||
channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
|
||||
})
|
||||
}
|
||||
|
||||
fn assert_app_server_started_event(event: &Value, rpc_transport: &str) {
|
||||
assert_eq!(event["event_params"]["rpc_transport"], rpc_transport);
|
||||
assert!(event["event_params"]["duration_ms"].as_u64().is_some());
|
||||
assert!(event["event_params"]["created_at"].as_u64().is_some());
|
||||
assert!(
|
||||
event["event_params"]["runtime"]["codex_rs_version"]
|
||||
.as_str()
|
||||
.is_some()
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) async fn mount_analytics_capture(server: &MockServer, codex_home: &Path) -> Result<()> {
|
||||
mount_analytics_endpoint(server).await;
|
||||
|
||||
let config_path = codex_home.join("config.toml");
|
||||
let config = std::fs::read_to_string(&config_path)?;
|
||||
std::fs::write(
|
||||
config_path,
|
||||
format!("{config}\n[analytics]\nenabled = true\n"),
|
||||
)?;
|
||||
|
||||
write_analytics_auth(codex_home)
|
||||
}
|
||||
|
||||
async fn mount_analytics_endpoint(server: &MockServer) {
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/codex/analytics-events/events"))
|
||||
.respond_with(ResponseTemplate::new(200))
|
||||
.mount(server)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn write_analytics_auth(codex_home: &Path) -> Result<()> {
|
||||
write_chatgpt_auth(
|
||||
codex_home,
|
||||
ChatGptAuthFixture::new("chatgpt-token")
|
||||
@@ -98,26 +259,57 @@ pub(crate) async fn mount_analytics_capture(server: &MockServer, codex_home: &Pa
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_analytics_payload(
|
||||
async fn assert_analytics_event_not_received(
|
||||
server: &MockServer,
|
||||
wait_duration: Duration,
|
||||
event_type: &str,
|
||||
) -> Result<()> {
|
||||
tokio::time::sleep(wait_duration).await;
|
||||
let Some(requests) = server.received_requests().await else {
|
||||
return Ok(());
|
||||
};
|
||||
for request in &requests {
|
||||
if request.method != "POST" || request.url.path() != "/codex/analytics-events/events" {
|
||||
continue;
|
||||
}
|
||||
let payload: Value = serde_json::from_slice(&request.body)
|
||||
.map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"))?;
|
||||
if payload["events"]
|
||||
.as_array()
|
||||
.is_some_and(|events| events.iter().any(|event| event["event_type"] == event_type))
|
||||
{
|
||||
anyhow::bail!("received unexpected {event_type} analytics event");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_thread_initialized_payload(
|
||||
server: &MockServer,
|
||||
read_timeout: Duration,
|
||||
) -> Result<Value> {
|
||||
let body = timeout(read_timeout, async {
|
||||
timeout(read_timeout, async {
|
||||
loop {
|
||||
let Some(requests) = server.received_requests().await else {
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
continue;
|
||||
};
|
||||
if let Some(request) = requests.iter().find(|request| {
|
||||
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
|
||||
}) {
|
||||
break request.body.clone();
|
||||
for request in &requests {
|
||||
if request.method != "POST"
|
||||
|| request.url.path() != "/codex/analytics-events/events"
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let payload: Value = serde_json::from_slice(&request.body)
|
||||
.map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"))?;
|
||||
if thread_initialized_event(&payload).is_ok() {
|
||||
return Ok::<Value, anyhow::Error>(payload);
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
serde_json::from_slice(&body).map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"))
|
||||
.await?
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_analytics_event(
|
||||
|
||||
@@ -763,6 +763,7 @@ async fn plugin_install_tracks_remote_plugin_analytics_event() -> Result<()> {
|
||||
)
|
||||
.await;
|
||||
configure_remote_plugin_test(codex_home.path(), &server)?;
|
||||
enable_analytics(codex_home.path())?;
|
||||
mount_remote_plugin_detail(&server, REMOTE_PLUGIN_ID, "1.2.3", Some(&bundle_url)).await;
|
||||
mount_empty_remote_installed_plugins(&server).await;
|
||||
mount_remote_plugin_install(&server, REMOTE_PLUGIN_ID).await;
|
||||
@@ -1333,7 +1334,16 @@ plugins = true
|
||||
fn write_analytics_config(codex_home: &std::path::Path, base_url: &str) -> std::io::Result<()> {
|
||||
std::fs::write(
|
||||
codex_home.join("config.toml"),
|
||||
format!("chatgpt_base_url = \"{base_url}\"\n"),
|
||||
format!("chatgpt_base_url = \"{base_url}\"\n\n[analytics]\nenabled = true\n"),
|
||||
)
|
||||
}
|
||||
|
||||
fn enable_analytics(codex_home: &std::path::Path) -> std::io::Result<()> {
|
||||
let config_path = codex_home.join("config.toml");
|
||||
let config = std::fs::read_to_string(&config_path)?;
|
||||
std::fs::write(
|
||||
config_path,
|
||||
format!("{config}\n[analytics]\nenabled = true\n"),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1352,15 +1362,22 @@ async fn wait_for_plugin_analytics_payload(server: &MockServer) -> Result<serde_
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
continue;
|
||||
};
|
||||
if let Some(request) = requests.iter().find(|request| {
|
||||
for request in requests.iter().filter(|request| {
|
||||
request.method == "POST"
|
||||
&& request
|
||||
.url
|
||||
.path()
|
||||
.ends_with("/codex/analytics-events/events")
|
||||
}) {
|
||||
return serde_json::from_slice(&request.body)
|
||||
.map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"));
|
||||
let payload: serde_json::Value = serde_json::from_slice(&request.body)
|
||||
.map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"))?;
|
||||
if payload["events"].as_array().is_some_and(|events| {
|
||||
events
|
||||
.iter()
|
||||
.any(|event| event["event_type"] == "codex_plugin_installed")
|
||||
}) {
|
||||
return Ok::<serde_json::Value, anyhow::Error>(payload);
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
|
||||
@@ -336,12 +336,17 @@ async fn plugin_share_save_rejects_when_plugin_sharing_disabled() -> Result<()>
|
||||
let plugin_root = TempDir::new()?;
|
||||
let plugin_path = write_test_plugin(plugin_root.path(), "demo-plugin")?;
|
||||
let server = MockServer::start().await;
|
||||
// This test verifies sharing makes no backend request; startup analytics
|
||||
// would otherwise be an unrelated request to the same mock server.
|
||||
std::fs::write(
|
||||
codex_home.path().join("config.toml"),
|
||||
format!(
|
||||
r#"
|
||||
chatgpt_base_url = "{}/backend-api"
|
||||
|
||||
[analytics]
|
||||
enabled = false
|
||||
|
||||
[features]
|
||||
plugins = true
|
||||
remote_plugin = true
|
||||
|
||||
@@ -87,7 +87,7 @@ async fn plugin_uninstall_tracks_analytics_event() -> Result<()> {
|
||||
std::fs::write(
|
||||
codex_home.path().join("config.toml"),
|
||||
format!(
|
||||
"chatgpt_base_url = \"{}\"\n\n[features]\nplugins = true\n\n[plugins.\"sample-plugin@debug\"]\nenabled = true\n",
|
||||
"chatgpt_base_url = \"{}\"\n\n[features]\nplugins = true\n\n[analytics]\nenabled = true\n\n[plugins.\"sample-plugin@debug\"]\nenabled = true\n",
|
||||
analytics_server.uri()
|
||||
),
|
||||
)?;
|
||||
@@ -122,16 +122,23 @@ async fn plugin_uninstall_tracks_analytics_event() -> Result<()> {
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
continue;
|
||||
};
|
||||
if let Some(request) = requests.iter().find(|request| {
|
||||
for request in requests.iter().filter(|request| {
|
||||
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
|
||||
}) {
|
||||
break request.body.clone();
|
||||
let payload: serde_json::Value =
|
||||
serde_json::from_slice(&request.body).expect("analytics payload");
|
||||
if payload["events"].as_array().is_some_and(|events| {
|
||||
events
|
||||
.iter()
|
||||
.any(|event| event["event_type"] == "codex_plugin_uninstalled")
|
||||
}) {
|
||||
return payload;
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
let payload: serde_json::Value = serde_json::from_slice(&payload).expect("analytics payload");
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!({
|
||||
|
||||
@@ -139,6 +139,14 @@ impl BlockingRemoteControlBackend {
|
||||
&remote_control_url,
|
||||
&remote_control_url,
|
||||
)?;
|
||||
// This fixture implements remote-control enrollment only; prevent the
|
||||
// startup event from becoming the first request to its blocking server.
|
||||
let config_path = codex_home.join("config.toml");
|
||||
let config = std::fs::read_to_string(&config_path)?;
|
||||
std::fs::write(
|
||||
config_path,
|
||||
format!("{config}\n[analytics]\nenabled = false\n"),
|
||||
)?;
|
||||
write_chatgpt_auth(
|
||||
codex_home,
|
||||
ChatGptAuthFixture::new("chatgpt-token")
|
||||
|
||||
@@ -44,7 +44,7 @@ use wiremock::matchers::path;
|
||||
use super::analytics::assert_basic_thread_initialized_event;
|
||||
use super::analytics::mount_analytics_capture;
|
||||
use super::analytics::thread_initialized_event;
|
||||
use super::analytics::wait_for_analytics_payload;
|
||||
use super::analytics::wait_for_thread_initialized_payload;
|
||||
|
||||
#[cfg(windows)]
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
|
||||
@@ -403,7 +403,7 @@ async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> {
|
||||
.await??;
|
||||
let ThreadForkResponse { thread, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
|
||||
|
||||
let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?;
|
||||
let payload = wait_for_thread_initialized_payload(&server, DEFAULT_READ_TIMEOUT).await?;
|
||||
let event = thread_initialized_event(&payload)?;
|
||||
assert_basic_thread_initialized_event(
|
||||
event,
|
||||
|
||||
@@ -98,7 +98,7 @@ use wiremock::matchers::path;
|
||||
use super::analytics::assert_basic_thread_initialized_event;
|
||||
use super::analytics::mount_analytics_capture;
|
||||
use super::analytics::thread_initialized_event;
|
||||
use super::analytics::wait_for_analytics_payload;
|
||||
use super::analytics::wait_for_thread_initialized_payload;
|
||||
|
||||
#[cfg(windows)]
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
|
||||
@@ -424,7 +424,7 @@ async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> {
|
||||
);
|
||||
assert_eq!(thread.thread_source, Some(ThreadSource::User));
|
||||
|
||||
let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?;
|
||||
let payload = wait_for_thread_initialized_payload(&server, DEFAULT_READ_TIMEOUT).await?;
|
||||
let event = thread_initialized_event(&payload)?;
|
||||
assert_basic_thread_initialized_event(
|
||||
event,
|
||||
|
||||
@@ -47,7 +47,7 @@ use wiremock::matchers::path;
|
||||
use super::analytics::assert_basic_thread_initialized_event;
|
||||
use super::analytics::mount_analytics_capture;
|
||||
use super::analytics::thread_initialized_event;
|
||||
use super::analytics::wait_for_analytics_payload;
|
||||
use super::analytics::wait_for_thread_initialized_payload;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
|
||||
@@ -436,7 +436,7 @@ async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> {
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(resp)?;
|
||||
|
||||
let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?;
|
||||
let payload = wait_for_thread_initialized_payload(&server, DEFAULT_READ_TIMEOUT).await?;
|
||||
assert_eq!(payload["events"].as_array().expect("events array").len(), 1);
|
||||
let event = thread_initialized_event(&payload)?;
|
||||
assert_basic_thread_initialized_event(
|
||||
|
||||
Reference in New Issue
Block a user