diff --git a/scripts/plugin_remote_api_latency_probe.py b/scripts/plugin_remote_api_latency_probe.py new file mode 100644 index 0000000000..9eead048ed --- /dev/null +++ b/scripts/plugin_remote_api_latency_probe.py @@ -0,0 +1,414 @@ +#!/usr/bin/env python3 +"""Measure latency for production plugin APIs used by Codex plugin loading.""" + +from __future__ import annotations + +import argparse +import csv +import json +import math +import statistics +import sys +import time +import urllib.error +import urllib.parse +import urllib.request +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +DEFAULT_BASE_URL = "https://chatgpt.com" +DEFAULT_AUTH_JSON = "~/.codex/auth.json" +DEFAULT_OUTPUT_DIR = "/tmp" + + +@dataclass(frozen=True) +class ProbeEndpoint: + label: str + path: str + timeout_sec: float + + +ENDPOINTS: tuple[ProbeEndpoint, ...] = ( + ProbeEndpoint( + label="workspace_installed_for_shared", + path="/backend-api/ps/plugins/installed?scope=WORKSPACE", + timeout_sec=30.0, + ), + ProbeEndpoint( + label="workspace_shared", + path="/backend-api/ps/plugins/workspace/shared?limit=200", + timeout_sec=30.0, + ), + ProbeEndpoint( + label="workspace_directory", + path="/backend-api/ps/plugins/list?scope=WORKSPACE&limit=200", + timeout_sec=30.0, + ), + ProbeEndpoint( + label="global_directory", + path="/backend-api/ps/plugins/list?scope=GLOBAL&limit=200", + timeout_sec=30.0, + ), + ProbeEndpoint( + label="global_installed", + path="/backend-api/ps/plugins/installed?scope=GLOBAL", + timeout_sec=30.0, + ), + ProbeEndpoint( + label="featured_plugin_ids", + path="/backend-api/plugins/featured?platform=codex", + timeout_sec=10.0, + ), + ProbeEndpoint( + label="created_by_me_workspace", + path="/backend-api/ps/plugins/workspace/created?limit=200", + timeout_sec=30.0, + ), + ProbeEndpoint( + label="workspace_installed_after_created", + path="/backend-api/ps/plugins/installed?scope=WORKSPACE", + timeout_sec=30.0, + ), +) + + +CSV_FIELDS = ( + "iteration", + "order", + "label", + "method", + "url_path_query", + "started_at", + "ended_at", + "latency_ms", + "latency_ns", + "status", + "success", + "response_bytes", + "request_id", + "error", +) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=( + "Sequentially benchmark production Codex plugin-loading APIs. " + "The default run performs 100 iterations of all endpoints." + ) + ) + parser.add_argument("--iterations", type=int, default=100) + parser.add_argument("--call-gap-sec", type=float, default=1.0) + parser.add_argument("--iteration-gap-sec", type=float, default=10.0) + parser.add_argument("--base-url", default=DEFAULT_BASE_URL) + parser.add_argument("--auth-json", default=DEFAULT_AUTH_JSON) + parser.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR) + parser.add_argument( + "--user-agent", + default="codex-plugin-api-latency-probe/1.0", + help="User-Agent sent to production APIs.", + ) + return parser.parse_args() + + +def utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat(timespec="milliseconds") + + +def load_auth_headers(auth_json_path: str, user_agent: str) -> dict[str, str]: + path = Path(auth_json_path).expanduser() + try: + payload = json.loads(path.read_text()) + except FileNotFoundError as exc: + raise SystemExit(f"auth file not found: {path}") from exc + except json.JSONDecodeError as exc: + raise SystemExit(f"auth file is not valid JSON: {path}: {exc}") from exc + + tokens = payload.get("tokens") + if not isinstance(tokens, dict): + raise SystemExit(f"auth file does not contain tokens object: {path}") + + access_token = tokens.get("access_token") + if not isinstance(access_token, str) or not access_token.strip(): + raise SystemExit( + "auth file does not contain tokens.access_token. " + "This probe requires ChatGPT token auth from ~/.codex/auth.json." + ) + + headers = { + "Authorization": f"Bearer {access_token}", + "Accept": "application/json", + "originator": "codex_cli_rs", + "User-Agent": user_agent, + } + + account_id = tokens.get("account_id") + if isinstance(account_id, str) and account_id.strip(): + headers["ChatGPT-Account-ID"] = account_id + + return headers + + +def join_url(base_url: str, path: str) -> str: + base_url = base_url.rstrip("/") + if not path.startswith("/"): + path = f"/{path}" + return f"{base_url}{path}" + + +def response_request_id(headers: Any) -> str: + for name in ( + "x-request-id", + "openai-request-id", + "cf-ray", + "x-envoy-upstream-service-time", + ): + value = headers.get(name) + if value: + return str(value) + return "" + + +def perform_request( + endpoint: ProbeEndpoint, + base_url: str, + headers: dict[str, str], + iteration: int, + order: int, +) -> dict[str, Any]: + url = join_url(base_url, endpoint.path) + request = urllib.request.Request(url, headers=headers, method="GET") + started_at = utc_now_iso() + started_ns = time.perf_counter_ns() + status = "" + response_bytes = 0 + request_id = "" + error = "" + + try: + with urllib.request.urlopen(request, timeout=endpoint.timeout_sec) as response: + body = response.read() + status = str(response.status) + response_bytes = len(body) + request_id = response_request_id(response.headers) + except urllib.error.HTTPError as exc: + body = exc.read() + status = str(exc.code) + response_bytes = len(body) + request_id = response_request_id(exc.headers) + error = f"HTTPError: {exc.code} {exc.reason}" + except urllib.error.URLError as exc: + error = f"URLError: {exc.reason}" + except TimeoutError as exc: + error = f"TimeoutError: {exc}" + except Exception as exc: # Keep the long-running probe alive across one bad call. + error = f"{type(exc).__name__}: {exc}" + + ended_ns = time.perf_counter_ns() + ended_at = utc_now_iso() + latency_ns = ended_ns - started_ns + latency_ms = latency_ns / 1_000_000 + success = status.startswith("2") and not error + + return { + "iteration": iteration, + "order": order, + "label": endpoint.label, + "method": "GET", + "url_path_query": endpoint.path, + "started_at": started_at, + "ended_at": ended_at, + "latency_ms": f"{latency_ms:.3f}", + "latency_ns": latency_ns, + "status": status, + "success": str(success).lower(), + "response_bytes": response_bytes, + "request_id": request_id, + "error": error, + } + + +def nearest_rank(values: list[float], percentile: float) -> float | None: + if not values: + return None + sorted_values = sorted(values) + rank = max(1, math.ceil((percentile / 100) * len(sorted_values))) + return sorted_values[rank - 1] + + +def summarize_rows(rows: list[dict[str, Any]]) -> dict[str, Any]: + summaries: dict[str, Any] = {} + for endpoint in ENDPOINTS: + endpoint_rows = [row for row in rows if row["label"] == endpoint.label] + latencies = [float(row["latency_ms"]) for row in endpoint_rows] + success_rows = [row for row in endpoint_rows if row["success"] == "true"] + error_rows = [row for row in endpoint_rows if row["success"] != "true"] + non_2xx_rows = [ + row + for row in endpoint_rows + if not str(row["status"]).startswith("2") + ] + + if latencies: + summary = { + "count": len(endpoint_rows), + "success_count": len(success_rows), + "error_count": len(error_rows), + "non_2xx_count": len(non_2xx_rows), + "min_ms": min(latencies), + "mean_ms": statistics.fmean(latencies), + "p50_ms": statistics.median(latencies), + "p90_ms": nearest_rank(latencies, 90), + "p95_ms": nearest_rank(latencies, 95), + "p99_ms": nearest_rank(latencies, 99), + "max_ms": max(latencies), + "stdev_ms": statistics.stdev(latencies) if len(latencies) > 1 else 0.0, + } + else: + summary = { + "count": 0, + "success_count": 0, + "error_count": 0, + "non_2xx_count": 0, + "min_ms": None, + "mean_ms": None, + "p50_ms": None, + "p90_ms": None, + "p95_ms": None, + "p99_ms": None, + "max_ms": None, + "stdev_ms": None, + } + + summaries[endpoint.label] = summary + + return { + "generated_at": utc_now_iso(), + "percentile_method": "nearest_rank", + "endpoints": summaries, + } + + +def write_outputs( + rows: list[dict[str, Any]], + csv_path: Path, + summary_path: Path, +) -> dict[str, Any]: + with csv_path.open("w", newline="") as file: + writer = csv.DictWriter(file, fieldnames=CSV_FIELDS) + writer.writeheader() + writer.writerows(rows) + + summary = summarize_rows(rows) + summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") + return summary + + +def print_summary(summary: dict[str, Any]) -> None: + print() + print("Latency summary, milliseconds") + print( + "label,count,success,error,non_2xx,min,mean,p50,p90,p95,p99,max,stdev" + ) + for endpoint in ENDPOINTS: + item = summary["endpoints"][endpoint.label] + values = [ + endpoint.label, + item["count"], + item["success_count"], + item["error_count"], + item["non_2xx_count"], + item["min_ms"], + item["mean_ms"], + item["p50_ms"], + item["p90_ms"], + item["p95_ms"], + item["p99_ms"], + item["max_ms"], + item["stdev_ms"], + ] + print( + ",".join( + f"{value:.3f}" if isinstance(value, float) else str(value) + for value in values + ) + ) + + +def validate_args(args: argparse.Namespace) -> None: + if args.iterations < 1: + raise SystemExit("--iterations must be >= 1") + if args.call_gap_sec < 0: + raise SystemExit("--call-gap-sec must be >= 0") + if args.iteration_gap_sec < 0: + raise SystemExit("--iteration-gap-sec must be >= 0") + + +def main() -> int: + args = parse_args() + validate_args(args) + headers = load_auth_headers(args.auth_json, args.user_agent) + + output_dir = Path(args.output_dir).expanduser() + output_dir.mkdir(parents=True, exist_ok=True) + stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + csv_path = output_dir / f"codex-plugin-api-latency-{stamp}.csv" + summary_path = output_dir / f"codex-plugin-api-latency-{stamp}.summary.json" + + rows: list[dict[str, Any]] = [] + total_calls = args.iterations * len(ENDPOINTS) + completed_calls = 0 + + print(f"base_url={args.base_url.rstrip('/')}") + print(f"iterations={args.iterations}") + print(f"call_gap_sec={args.call_gap_sec}") + print(f"iteration_gap_sec={args.iteration_gap_sec}") + print(f"csv_path={csv_path}") + print(f"summary_path={summary_path}") + print(f"total_calls={total_calls}") + print() + + try: + for iteration in range(1, args.iterations + 1): + for order, endpoint in enumerate(ENDPOINTS, start=1): + row = perform_request( + endpoint=endpoint, + base_url=args.base_url, + headers=headers, + iteration=iteration, + order=order, + ) + rows.append(row) + completed_calls += 1 + status = row["status"] or "no-status" + outcome = "ok" if row["success"] == "true" else "error" + print( + f"[{completed_calls}/{total_calls}] " + f"iteration={iteration} order={order} label={endpoint.label} " + f"status={status} outcome={outcome} " + f"latency_ms={row['latency_ms']}" + ) + sys.stdout.flush() + + if order != len(ENDPOINTS) and args.call_gap_sec > 0: + time.sleep(args.call_gap_sec) + + if iteration != args.iterations and args.iteration_gap_sec > 0: + time.sleep(args.iteration_gap_sec) + except KeyboardInterrupt: + print() + print("Interrupted; writing partial outputs.") + + summary = write_outputs(rows, csv_path, summary_path) + print_summary(summary) + print() + print(f"Wrote CSV: {csv_path}") + print(f"Wrote summary: {summary_path}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())