cover centralized response analytics path

This commit is contained in:
Roy Han
2026-04-28 11:02:41 -07:00
parent 42cb76b669
commit 122a4f62e1

View File

@@ -701,34 +701,179 @@ pub(crate) struct OutgoingError {
mod tests {
use std::time::Duration;
use codex_analytics::AppServerRpcTransport;
use codex_analytics::AuthManagerRetention;
use codex_app_server_protocol::AccountLoginCompletedNotification;
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
use codex_app_server_protocol::AccountUpdatedNotification;
use codex_app_server_protocol::ApplyPatchApprovalParams;
use codex_app_server_protocol::ApprovalsReviewer;
use codex_app_server_protocol::AskForApproval;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::DynamicToolCallParams;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
use codex_app_server_protocol::GuardianWarningNotification;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::ModelRerouteReason;
use codex_app_server_protocol::ModelReroutedNotification;
use codex_app_server_protocol::ModelVerification;
use codex_app_server_protocol::ModelVerificationNotification;
use codex_app_server_protocol::PermissionProfile;
use codex_app_server_protocol::RateLimitSnapshot;
use codex_app_server_protocol::RateLimitWindow;
use codex_app_server_protocol::SandboxPolicy;
use codex_app_server_protocol::ServerResponse;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ToolRequestUserInputParams;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::TurnSteerParams;
use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInput;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_protocol::ThreadId;
use codex_utils_absolute_path::test_support::PathBufExt;
use codex_utils_absolute_path::test_support::test_path_buf;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::sync::Arc;
use tokio::time::timeout;
use uuid::Uuid;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
use super::*;
fn sample_thread_start_response(thread_id: &str) -> ClientResponsePayload {
ClientResponsePayload::ThreadStart(ThreadStartResponse {
thread: Thread {
id: thread_id.to_string(),
forked_from_id: None,
preview: "first prompt".to_string(),
ephemeral: false,
model_provider: "openai".to_string(),
created_at: 1,
updated_at: 2,
status: ThreadStatus::Idle,
path: None,
cwd: test_path_buf("/tmp").abs(),
cli_version: "0.0.0".to_string(),
source: SessionSource::Exec,
agent_nickname: None,
agent_role: None,
git_info: None,
name: None,
turns: Vec::new(),
},
model: "gpt-5".to_string(),
model_provider: "openai".to_string(),
service_tier: None,
cwd: test_path_buf("/tmp").abs(),
instruction_sources: Vec::new(),
approval_policy: AskForApproval::OnFailure,
approvals_reviewer: ApprovalsReviewer::User,
sandbox: SandboxPolicy::DangerFullAccess,
permission_profile: Some(PermissionProfile::from(
codex_protocol::models::PermissionProfile::Disabled,
)),
reasoning_effort: None,
})
}
fn sample_turn_start_request(thread_id: &str) -> ClientRequest {
ClientRequest::TurnStart {
request_id: RequestId::Integer(2),
params: TurnStartParams {
thread_id: thread_id.to_string(),
input: Vec::new(),
..Default::default()
},
}
}
fn sample_turn_start_response(turn_id: &str) -> ClientResponsePayload {
ClientResponsePayload::TurnStart(TurnStartResponse {
turn: Turn {
id: turn_id.to_string(),
items: Vec::new(),
status: TurnStatus::InProgress,
error: None,
started_at: None,
completed_at: None,
duration_ms: None,
},
})
}
fn sample_turn_steer_request(thread_id: &str, turn_id: &str) -> ClientRequest {
ClientRequest::TurnSteer {
request_id: RequestId::Integer(3),
params: TurnSteerParams {
thread_id: thread_id.to_string(),
expected_turn_id: turn_id.to_string(),
input: vec![UserInput::Text {
text: "more".to_string(),
text_elements: Vec::new(),
}],
responsesapi_client_metadata: None,
},
}
}
fn sample_turn_steer_response(turn_id: &str) -> ClientResponsePayload {
ClientResponsePayload::TurnSteer(TurnSteerResponse {
turn_id: turn_id.to_string(),
})
}
async fn wait_for_analytics_event(server: &MockServer, event_type: &str) -> Value {
timeout(Duration::from_secs(1), async {
loop {
let Some(requests) = server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
};
for request in &requests {
if request.method != "POST"
|| request.url.path() != "/codex/analytics-events/events"
{
continue;
}
let payload: Value =
serde_json::from_slice(&request.body).expect("valid analytics payload");
let Some(events) = payload["events"].as_array() else {
continue;
};
if let Some(event) = events
.iter()
.find(|event| event["event_type"] == event_type)
{
return event.clone();
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("analytics event before timeout")
}
#[test]
fn verify_server_notification_serialization() {
let notification =
@@ -1057,6 +1202,87 @@ mod tests {
assert_eq!(outgoing.request_context_count().await, 0);
}
#[tokio::test]
async fn send_response_tracks_accepted_turn_steer_analytics() {
let analytics_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/codex/analytics-events/events"))
.respond_with(ResponseTemplate::new(200))
.mount(&analytics_server)
.await;
let analytics_events_client = codex_analytics::AnalyticsEventsClient::new(
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
analytics_server.uri(),
/*analytics_enabled*/ Some(true),
AuthManagerRetention::Strong,
);
analytics_events_client.track_initialize(
/*connection_id*/ 7,
InitializeParams {
client_info: ClientInfo {
name: "codex-tui".to_string(),
title: None,
version: "1.0.0".to_string(),
},
capabilities: Some(InitializeCapabilities {
experimental_api: false,
opt_out_notification_methods: None,
}),
},
"codex-tui".to_string(),
AppServerRpcTransport::Stdio,
);
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx, analytics_events_client.clone());
outgoing
.send_response(
ConnectionRequestId {
connection_id: ConnectionId(7),
request_id: RequestId::Integer(1),
},
sample_thread_start_response("thread-1"),
)
.await;
analytics_events_client.track_request(
/*connection_id*/ 7,
RequestId::Integer(2),
&sample_turn_start_request("thread-1"),
);
outgoing
.send_response(
ConnectionRequestId {
connection_id: ConnectionId(7),
request_id: RequestId::Integer(2),
},
sample_turn_start_response("turn-1"),
)
.await;
analytics_events_client.track_request(
/*connection_id*/ 7,
RequestId::Integer(3),
&sample_turn_steer_request("thread-1", "turn-1"),
);
outgoing
.send_response(
ConnectionRequestId {
connection_id: ConnectionId(7),
request_id: RequestId::Integer(3),
},
sample_turn_steer_response("turn-1"),
)
.await;
let event = wait_for_analytics_event(&analytics_server, "codex_turn_steer_event").await;
assert_eq!(event["event_params"]["thread_id"], "thread-1");
assert_eq!(event["event_params"]["expected_turn_id"], "turn-1");
assert_eq!(event["event_params"]["accepted_turn_id"], "turn-1");
assert_eq!(event["event_params"]["result"], "accepted");
}
#[tokio::test]
async fn send_error_routes_to_target_connection() {
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);