diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 2e955bc012..d8185a3215 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -701,34 +701,179 @@ pub(crate) struct OutgoingError { mod tests { use std::time::Duration; + use codex_analytics::AppServerRpcTransport; + use codex_analytics::AuthManagerRetention; use codex_app_server_protocol::AccountLoginCompletedNotification; use codex_app_server_protocol::AccountRateLimitsUpdatedNotification; use codex_app_server_protocol::AccountUpdatedNotification; use codex_app_server_protocol::ApplyPatchApprovalParams; + use codex_app_server_protocol::ApprovalsReviewer; + use codex_app_server_protocol::AskForApproval; use codex_app_server_protocol::AuthMode; + use codex_app_server_protocol::ClientInfo; + use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::CommandExecutionApprovalDecision; use codex_app_server_protocol::CommandExecutionRequestApprovalParams; use codex_app_server_protocol::ConfigWarningNotification; use codex_app_server_protocol::DynamicToolCallParams; use codex_app_server_protocol::FileChangeRequestApprovalParams; use codex_app_server_protocol::GuardianWarningNotification; + use codex_app_server_protocol::InitializeCapabilities; + use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::ModelRerouteReason; use codex_app_server_protocol::ModelReroutedNotification; use codex_app_server_protocol::ModelVerification; use codex_app_server_protocol::ModelVerificationNotification; + use codex_app_server_protocol::PermissionProfile; use codex_app_server_protocol::RateLimitSnapshot; use codex_app_server_protocol::RateLimitWindow; + use codex_app_server_protocol::SandboxPolicy; use codex_app_server_protocol::ServerResponse; + use codex_app_server_protocol::SessionSource; + use codex_app_server_protocol::Thread; + use codex_app_server_protocol::ThreadStartResponse; + use codex_app_server_protocol::ThreadStatus; use codex_app_server_protocol::ToolRequestUserInputParams; + use codex_app_server_protocol::Turn; + use codex_app_server_protocol::TurnStartParams; + use codex_app_server_protocol::TurnStartResponse; + use codex_app_server_protocol::TurnStatus; + use codex_app_server_protocol::TurnSteerParams; + use codex_app_server_protocol::TurnSteerResponse; + use codex_app_server_protocol::UserInput; + use codex_login::AuthManager; + use codex_login::CodexAuth; use codex_protocol::ThreadId; + use codex_utils_absolute_path::test_support::PathBufExt; + use codex_utils_absolute_path::test_support::test_path_buf; use pretty_assertions::assert_eq; + use serde_json::Value; use serde_json::json; use std::sync::Arc; use tokio::time::timeout; use uuid::Uuid; + use wiremock::Mock; + use wiremock::MockServer; + use wiremock::ResponseTemplate; + use wiremock::matchers::method; + use wiremock::matchers::path; use super::*; + fn sample_thread_start_response(thread_id: &str) -> ClientResponsePayload { + ClientResponsePayload::ThreadStart(ThreadStartResponse { + thread: Thread { + id: thread_id.to_string(), + forked_from_id: None, + preview: "first prompt".to_string(), + ephemeral: false, + model_provider: "openai".to_string(), + created_at: 1, + updated_at: 2, + status: ThreadStatus::Idle, + path: None, + cwd: test_path_buf("/tmp").abs(), + cli_version: "0.0.0".to_string(), + source: SessionSource::Exec, + agent_nickname: None, + agent_role: None, + git_info: None, + name: None, + turns: Vec::new(), + }, + model: "gpt-5".to_string(), + model_provider: "openai".to_string(), + service_tier: None, + cwd: test_path_buf("/tmp").abs(), + instruction_sources: Vec::new(), + approval_policy: AskForApproval::OnFailure, + approvals_reviewer: ApprovalsReviewer::User, + sandbox: SandboxPolicy::DangerFullAccess, + permission_profile: Some(PermissionProfile::from( + codex_protocol::models::PermissionProfile::Disabled, + )), + reasoning_effort: None, + }) + } + + fn sample_turn_start_request(thread_id: &str) -> ClientRequest { + ClientRequest::TurnStart { + request_id: RequestId::Integer(2), + params: TurnStartParams { + thread_id: thread_id.to_string(), + input: Vec::new(), + ..Default::default() + }, + } + } + + fn sample_turn_start_response(turn_id: &str) -> ClientResponsePayload { + ClientResponsePayload::TurnStart(TurnStartResponse { + turn: Turn { + id: turn_id.to_string(), + items: Vec::new(), + status: TurnStatus::InProgress, + error: None, + started_at: None, + completed_at: None, + duration_ms: None, + }, + }) + } + + fn sample_turn_steer_request(thread_id: &str, turn_id: &str) -> ClientRequest { + ClientRequest::TurnSteer { + request_id: RequestId::Integer(3), + params: TurnSteerParams { + thread_id: thread_id.to_string(), + expected_turn_id: turn_id.to_string(), + input: vec![UserInput::Text { + text: "more".to_string(), + text_elements: Vec::new(), + }], + responsesapi_client_metadata: None, + }, + } + } + + fn sample_turn_steer_response(turn_id: &str) -> ClientResponsePayload { + ClientResponsePayload::TurnSteer(TurnSteerResponse { + turn_id: turn_id.to_string(), + }) + } + + async fn wait_for_analytics_event(server: &MockServer, event_type: &str) -> Value { + timeout(Duration::from_secs(1), async { + loop { + let Some(requests) = server.received_requests().await else { + tokio::time::sleep(Duration::from_millis(10)).await; + continue; + }; + for request in &requests { + if request.method != "POST" + || request.url.path() != "/codex/analytics-events/events" + { + continue; + } + let payload: Value = + serde_json::from_slice(&request.body).expect("valid analytics payload"); + let Some(events) = payload["events"].as_array() else { + continue; + }; + if let Some(event) = events + .iter() + .find(|event| event["event_type"] == event_type) + { + return event.clone(); + } + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("analytics event before timeout") + } + #[test] fn verify_server_notification_serialization() { let notification = @@ -1057,6 +1202,87 @@ mod tests { assert_eq!(outgoing.request_context_count().await, 0); } + #[tokio::test] + async fn send_response_tracks_accepted_turn_steer_analytics() { + let analytics_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/codex/analytics-events/events")) + .respond_with(ResponseTemplate::new(200)) + .mount(&analytics_server) + .await; + + let analytics_events_client = codex_analytics::AnalyticsEventsClient::new( + AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()), + analytics_server.uri(), + /*analytics_enabled*/ Some(true), + AuthManagerRetention::Strong, + ); + analytics_events_client.track_initialize( + /*connection_id*/ 7, + InitializeParams { + client_info: ClientInfo { + name: "codex-tui".to_string(), + title: None, + version: "1.0.0".to_string(), + }, + capabilities: Some(InitializeCapabilities { + experimental_api: false, + opt_out_notification_methods: None, + }), + }, + "codex-tui".to_string(), + AppServerRpcTransport::Stdio, + ); + + let (tx, _rx) = mpsc::channel::(4); + let outgoing = OutgoingMessageSender::new(tx, analytics_events_client.clone()); + outgoing + .send_response( + ConnectionRequestId { + connection_id: ConnectionId(7), + request_id: RequestId::Integer(1), + }, + sample_thread_start_response("thread-1"), + ) + .await; + + analytics_events_client.track_request( + /*connection_id*/ 7, + RequestId::Integer(2), + &sample_turn_start_request("thread-1"), + ); + outgoing + .send_response( + ConnectionRequestId { + connection_id: ConnectionId(7), + request_id: RequestId::Integer(2), + }, + sample_turn_start_response("turn-1"), + ) + .await; + + analytics_events_client.track_request( + /*connection_id*/ 7, + RequestId::Integer(3), + &sample_turn_steer_request("thread-1", "turn-1"), + ); + outgoing + .send_response( + ConnectionRequestId { + connection_id: ConnectionId(7), + request_id: RequestId::Integer(3), + }, + sample_turn_steer_response("turn-1"), + ) + .await; + + let event = wait_for_analytics_event(&analytics_server, "codex_turn_steer_event").await; + assert_eq!(event["event_params"]["thread_id"], "thread-1"); + assert_eq!(event["event_params"]["expected_turn_id"], "turn-1"); + assert_eq!(event["event_params"]["accepted_turn_id"], "turn-1"); + assert_eq!(event["event_params"]["result"], "accepted"); + } + #[tokio::test] async fn send_error_routes_to_target_connection() { let (tx, mut rx) = mpsc::channel::(4);