pivot to consuming from app-event stream

This commit is contained in:
Roy Han
2026-03-26 10:56:26 -07:00
parent 71d88a63c2
commit 5567bf4aed
13 changed files with 467 additions and 215 deletions

View File

@@ -1,10 +1,23 @@
use anyhow::Result;
use app_test_support::ChatGptAuthFixture;
use app_test_support::DEFAULT_CLIENT_NAME;
use app_test_support::write_chatgpt_auth;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::config::ConfigBuilder;
use codex_core::config::types::OtelExporterKind;
use codex_core::config::types::OtelHttpProtocol;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::timeout;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
const SERVICE_VERSION: &str = "0.0.0-test";
@@ -65,3 +78,81 @@ async fn app_server_default_analytics_enabled_with_flag() -> Result<()> {
assert_eq!(has_metrics, true);
Ok(())
}
pub(crate) async fn enable_analytics_capture(server: &MockServer, codex_home: &Path) -> Result<()> {
Mock::given(method("POST"))
.and(path("/codex/analytics-events/events"))
.respond_with(ResponseTemplate::new(200))
.mount(server)
.await;
write_chatgpt_auth(
codex_home,
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
Ok(())
}
pub(crate) async fn wait_for_analytics_payload(
server: &MockServer,
read_timeout: Duration,
) -> Result<Value> {
let body = timeout(read_timeout, async {
loop {
let Some(requests) = server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
if let Some(request) = requests.iter().find(|request| {
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
}) {
break request.body.clone();
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?;
Ok(serde_json::from_slice(&body).expect("analytics payload"))
}
pub(crate) fn thread_initialized_event(payload: &Value) -> &Value {
payload["events"]
.as_array()
.expect("events array")
.iter()
.find(|event| event["event_type"] == "codex_thread_initialized")
.expect("codex_thread_initialized event should be present")
}
pub(crate) fn assert_basic_thread_initialized_event(
event: &Value,
thread_id: &str,
initialization_mode: &str,
) {
assert_eq!(event["event_params"]["thread_id"], thread_id);
assert_eq!(
event["event_params"]["product_client_id"],
DEFAULT_CLIENT_NAME
);
assert_eq!(event["event_params"]["model"], "mock-model");
assert_eq!(event["event_params"]["ephemeral"], false);
assert_eq!(event["event_params"]["session_source"], "user");
assert_eq!(
event["event_params"]["subagent_source"],
serde_json::Value::Null
);
assert_eq!(
event["event_params"]["parent_thread_id"],
serde_json::Value::Null
);
assert_eq!(
event["event_params"]["initialization_mode"],
initialization_mode
);
assert!(event["event_params"]["created_at"].as_u64().is_some());
}